http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java index 7d28d52..053b2a8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterDataSerializers.java @@ -37,22 +37,25 @@ public class RegisterDataSerializers extends BaseCommand { private RegisterDataSerializers() {} - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { if (logger.isDebugEnabled()) { logger.debug("{}: Received register dataserializer request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), + serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); // 2 parts per instantiator and one eventId part int noOfDataSerializers = (noOfParts - 1) / 2; // retrieve eventID from the last Part - ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(msg.getPart(noOfParts - 1).getSerializedForm()); + ByteBuffer eventIdPartsBuffer = + ByteBuffer.wrap(clientMessage.getPart(noOfParts - 1).getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = + new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); byte[][] serializedDataSerializers = new byte[noOfDataSerializers * 2][]; boolean caughtCNFE = false; @@ -60,12 +63,12 @@ public class RegisterDataSerializers extends BaseCommand { try { for (int i = 0; i < noOfParts - 1; i = i + 2) { - Part dataSerializerClassNamePart = msg.getPart(i); + Part dataSerializerClassNamePart = clientMessage.getPart(i); serializedDataSerializers[i] = dataSerializerClassNamePart.getSerializedForm(); String dataSerializerClassName = (String) CacheServerHelper.deserialize(serializedDataSerializers[i]); - Part idPart = msg.getPart(i + 1); + Part idPart = clientMessage.getPart(i + 1); serializedDataSerializers[i + 1] = idPart.getSerializedForm(); int id = idPart.getInt(); @@ -73,7 +76,7 @@ public class RegisterDataSerializers extends BaseCommand { try { dataSerializerClass = InternalDataSerializer.getCachedClass(dataSerializerClassName); InternalDataSerializer.register(dataSerializerClass, true, eventId, - servConn.getProxyID()); + serverConnection.getProxyID()); } catch (ClassNotFoundException e) { // If a ClassNotFoundException is caught, store it, but continue // processing other instantiators @@ -82,26 +85,27 @@ public class RegisterDataSerializers extends BaseCommand { } } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } // If a ClassNotFoundException was caught while processing the // instantiators, send it back to the client. Note: This only sends // the last CNFE. if (caughtCNFE) { - writeException(msg, cnfe, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, cnfe, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } // Send reply to client if necessary. If an exception occurs in the above // code, then the reply has already been sent. - if (!servConn.getTransientFlag(RESPONDED)) { - writeReply(msg, servConn); + if (!serverConnection.getTransientFlag(RESPONDED)) { + writeReply(clientMessage, serverConnection); } if (logger.isDebugEnabled()) { - logger.debug("Registered dataserializer for MembershipId = {}", servConn.getMembershipID()); + logger.debug("Registered dataserializer for MembershipId = {}", + serverConnection.getMembershipID()); } } }
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java index 1e701fc..df5a46c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInstantiators.java @@ -49,23 +49,26 @@ public class RegisterInstantiators extends BaseCommand { private RegisterInstantiators() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { if (logger.isDebugEnabled()) { logger.debug("{}: Received register instantiator request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), + serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); // Assert parts Assert.assertTrue((noOfParts - 1) % 3 == 0); // 3 parts per instantiator and one eventId part int noOfInstantiators = (noOfParts - 1) / 3; // retrieve eventID from the last Part - ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(msg.getPart(noOfParts - 1).getSerializedForm()); + ByteBuffer eventIdPartsBuffer = + ByteBuffer.wrap(clientMessage.getPart(noOfParts - 1).getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = + new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); byte[][] serializedInstantiators = new byte[noOfInstantiators * 3][]; boolean caughtCNFE = false; @@ -73,17 +76,17 @@ public class RegisterInstantiators extends BaseCommand { try { for (int i = 0; i < noOfParts - 1; i = i + 3) { - Part instantiatorPart = msg.getPart(i); + Part instantiatorPart = clientMessage.getPart(i); serializedInstantiators[i] = instantiatorPart.getSerializedForm(); String instantiatorClassName = (String) CacheServerHelper.deserialize(serializedInstantiators[i]); - Part instantiatedPart = msg.getPart(i + 1); + Part instantiatedPart = clientMessage.getPart(i + 1); serializedInstantiators[i + 1] = instantiatedPart.getSerializedForm(); String instantiatedClassName = (String) CacheServerHelper.deserialize(serializedInstantiators[i + 1]); - Part idPart = msg.getPart(i + 2); + Part idPart = clientMessage.getPart(i + 2); serializedInstantiators[i + 2] = idPart.getSerializedForm(); int id = idPart.getInt(); @@ -92,7 +95,7 @@ public class RegisterInstantiators extends BaseCommand { instantiatorClass = InternalDataSerializer.getCachedClass(instantiatorClassName); instantiatedClass = InternalDataSerializer.getCachedClass(instantiatedClassName); InternalInstantiator.register(instantiatorClass, instantiatedClass, id, true, eventId, - servConn.getProxyID()); + serverConnection.getProxyID()); } catch (ClassNotFoundException e) { // If a ClassNotFoundException is caught, store it, but continue // processing other instantiators @@ -102,17 +105,17 @@ public class RegisterInstantiators extends BaseCommand { } } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.RegisterInstantiators_BAD_CLIENT, - new Object[] {servConn.getMembershipID(), e.getLocalizedMessage()})); - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + new Object[] {serverConnection.getMembershipID(), e.getLocalizedMessage()})); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } // If a ClassNotFoundException was caught while processing the // instantiators, send it back to the client. Note: This only sends // the last CNFE. if (caughtCNFE) { - writeException(msg, cnfe, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, cnfe, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); // Send the instantiators on to other clients if we hit an error // due to a missing class, because they were not distributed @@ -120,7 +123,7 @@ public class RegisterInstantiators extends BaseCommand { // been distributed if successfully registered. ClientInstantiatorMessage clientInstantiatorMessage = new ClientInstantiatorMessage(EnumListenerEvent.AFTER_REGISTER_INSTANTIATOR, - serializedInstantiators, servConn.getProxyID(), eventId); + serializedInstantiators, serverConnection.getProxyID(), eventId); // Notify other clients CacheClientNotifier.routeClientMessage(clientInstantiatorMessage); @@ -129,12 +132,13 @@ public class RegisterInstantiators extends BaseCommand { // Send reply to client if necessary. If an exception occurs in the above // code, then the reply has already been sent. - if (!servConn.getTransientFlag(RESPONDED)) { - writeReply(msg, servConn); + if (!serverConnection.getTransientFlag(RESPONDED)) { + writeReply(clientMessage, serverConnection); } if (logger.isDebugEnabled()) { - logger.debug("Registered instantiators for MembershipId = {}", servConn.getMembershipID()); + logger.debug("Registered instantiators for MembershipId = {}", + serverConnection.getMembershipID()); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java index 52a929f..edd917a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest.java @@ -46,51 +46,51 @@ public class RegisterInterest extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); InterestResultPolicy policy = null; // Retrieve the interest type - int interestType = msg.getPart(1).getInt(); + int interestType = clientMessage.getPart(1).getInt(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(2).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(2).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(3); + Part durablePart = clientMessage.getPart(3); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Retrieve the key - keyPart = msg.getPart(4); + keyPart = clientMessage.getPart(4); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -98,21 +98,22 @@ public class RegisterInterest extends BaseCommand { // VJR: Check for a sixth part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > 5) { + if (clientMessage.getNumberOfParts() > 5) { try { - Part notifyPart = msg.getPart(5); + Part notifyPart = clientMessage.getPart(5); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } if (logger.isDebugEnabled()) { logger.debug("{}: Received register interest request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key); } // Process the register interest request @@ -126,19 +127,19 @@ public class RegisterInterest extends BaseCommand { message = LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } - logger.warn("{}: {}", servConn.getName(), message.toLocalizedString()); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, - message.toLocalizedString(), servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message.toLocalizedString()); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, + message.toLocalizedString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // input key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterest_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] {serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; @@ -151,7 +152,7 @@ public class RegisterInterest extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -159,14 +160,15 @@ public class RegisterInterest extends BaseCommand { key = registerContext.getKey(); } } - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, - servConn.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, false, 0, true); + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, + key, serverConnection.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, + false, 0, true); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -177,46 +179,47 @@ public class RegisterInterest extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - CacheClientProxy ccp = - servConn.getAcceptor().getCacheClientNotifier().getClientProxy(servConn.getProxyID()); + CacheClientProxy ccp = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()); if (ccp == null) { // fix for 37593 IOException ioex = new IOException( LocalizedStrings.RegisterInterest_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL .toLocalizedString()); - writeChunkedException(msg, ioex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ioex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isPrimary = ccp.isPrimary(); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } // !isPrimary else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, key, interestType, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, key, interestType, policy, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn, chunkedResponseMsg); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection, chunkedResponseMsg); + serverConnection.setAsTrue(RESPONDED); return; } @@ -224,8 +227,8 @@ public class RegisterInterest extends BaseCommand { // logger.debug(getName() + ": Sent chunk (1 of 1) of register interest // response (" + chunkedResponseMsg.getBufferLength() + " bytes) for // region " + regionName + " key " + key); - logger.debug("{}: Sent register interest response for region {} key {}", servConn.getName(), - regionName, key); + logger.debug("{}: Sent register interest response for region {} key {}", + serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java index 5ddb241..bad3bed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterest61.java @@ -60,67 +60,67 @@ public class RegisterInterest61 extends BaseCommand { RegisterInterest61() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); InterestResultPolicy policy = null; // Retrieve the interest type - int interestType = msg.getPart(1).getInt(); + int interestType = clientMessage.getPart(1).getInt(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(2).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(2).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(3); + Part durablePart = clientMessage.getPart(3); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; boolean serializeValues = false; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); - if (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) { + if (serverConnection.getClientVersion().compareTo(Version.GFE_80) >= 0) { // The second byte here is serializeValues serializeValues = regionDataPolicyPartBytes[1] == (byte) 0x01; } } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Retrieve the key - keyPart = msg.getPart(4); + keyPart = clientMessage.getPart(4); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -128,14 +128,14 @@ public class RegisterInterest61 extends BaseCommand { // VJR: Check for a sixth part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > 5) { + if (clientMessage.getNumberOfParts() > 5) { try { - Part notifyPart = msg.getPart(5); + Part notifyPart = clientMessage.getPart(5); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -143,7 +143,8 @@ public class RegisterInterest61 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 61 request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key); } // test hook to trigger vMotion during register Interest @@ -164,19 +165,19 @@ public class RegisterInterest61 extends BaseCommand { message = LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } - logger.warn("{}: {}", servConn.getName(), message.toLocalizedString()); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, - message.toLocalizedString(), servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message.toLocalizedString()); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, + message.toLocalizedString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // input key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterest_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] {serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; @@ -190,7 +191,7 @@ public class RegisterInterest61 extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -198,15 +199,15 @@ public class RegisterInterest61 extends BaseCommand { key = registerContext.getKey(); } } - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, key, - servConn.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, true, - regionDataPolicyPartBytes[0], true); + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, + key, serverConnection.getProxyID(), interestType, isDurable, sendUpdatesAsInvalidates, + true, regionDataPolicyPartBytes[0], true); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -217,47 +218,47 @@ public class RegisterInterest61 extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - CacheClientProxy ccp = - servConn.getAcceptor().getCacheClientNotifier().getClientProxy(servConn.getProxyID()); + CacheClientProxy ccp = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()); if (ccp == null) { // fix for 37593 IOException ioex = new IOException( LocalizedStrings.RegisterInterest_CACHECLIENTPROXY_FOR_THIS_CLIENT_IS_NO_LONGER_ON_THE_SERVER_SO_REGISTERINTEREST_OPERATION_IS_UNSUCCESSFUL .toLocalizedString()); - writeChunkedException(msg, ioex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ioex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isPrimary = ccp.isPrimary(); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } // !isPrimary else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { fillAndSendRegisterInterestResponseChunks(region, key, interestType, serializeValues, - policy, servConn); - servConn.setAsTrue(RESPONDED); + policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn, chunkedResponseMsg); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection, chunkedResponseMsg); + serverConnection.setAsTrue(RESPONDED); return; } @@ -265,8 +266,8 @@ public class RegisterInterest61 extends BaseCommand { // logger.debug(getName() + ": Sent chunk (1 of 1) of register interest // response (" + chunkedResponseMsg.getBufferLength() + " bytes) for // region " + regionName + " key " + key); - logger.debug("{}: Sent register interest response for region {} key {}", servConn.getName(), - regionName, key); + logger.debug("{}: Sent register interest response for region {} key {}", + serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java index cd16790..5f5fafa 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList.java @@ -49,57 +49,57 @@ public class RegisterInterestList extends BaseCommand { RegisterInterestList() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -109,14 +109,14 @@ public class RegisterInterestList extends BaseCommand { // VJR: Check for an extra part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > (numberOfKeys + partNumber)) { + if (clientMessage.getNumberOfParts() > (numberOfKeys + partNumber)) { try { - Part notifyPart = msg.getPart(numberOfKeys + partNumber); + Part notifyPart = clientMessage.getPart(numberOfKeys + partNumber); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -124,8 +124,8 @@ public class RegisterInterestList extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, - regionName, keys); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), numberOfKeys, regionName, keys); } /* @@ -154,25 +154,26 @@ public class RegisterInterestList extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] {serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -181,14 +182,14 @@ public class RegisterInterestList extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, false, 0, true); + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, + keys, serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, false, 0, true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -197,36 +198,37 @@ public class RegisterInterestList extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -236,7 +238,7 @@ public class RegisterInterestList extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java index 6e006ca..40a3c25 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList61.java @@ -49,67 +49,67 @@ public class RegisterInterestList61 extends BaseCommand { RegisterInterestList61() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -119,14 +119,14 @@ public class RegisterInterestList61 extends BaseCommand { // VJR: Check for an extra part for client version 6.0.3 onwards for the // time being until refactoring into a new command version. - if (msg.getNumberOfParts() > (numberOfKeys + partNumber)) { + if (clientMessage.getNumberOfParts() > (numberOfKeys + partNumber)) { try { - Part notifyPart = msg.getPart(numberOfKeys + partNumber); + Part notifyPart = clientMessage.getPart(numberOfKeys + partNumber); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -134,8 +134,8 @@ public class RegisterInterestList61 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 61 request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, - regionName, keys); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), numberOfKeys, regionName, keys); } /* @@ -164,25 +164,26 @@ public class RegisterInterestList61 extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] {serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -191,15 +192,15 @@ public class RegisterInterestList61 extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, + keys, serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, regionDataPolicyPartBytes[0], true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -208,36 +209,37 @@ public class RegisterInterestList61 extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, policy, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -247,7 +249,7 @@ public class RegisterInterestList61 extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java index 8a61364..6a2ad95 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RegisterInterestList66.java @@ -55,88 +55,88 @@ public class RegisterInterestList66 extends BaseCommand { RegisterInterestList66() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null;// numberOfKeysPart = null; String regionName = null; Object key = null; InterestResultPolicy policy; List keys = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + ChunkedMessage chunkedResponseMsg = serverConnection.getRegisterInterestResponseMessage(); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the InterestResultPolicy try { - policy = (InterestResultPolicy) msg.getPart(1).getObject(); + policy = (InterestResultPolicy) clientMessage.getPart(1).getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean isDurable = false; try { - Part durablePart = msg.getPart(2); + Part durablePart = clientMessage.getPart(2); byte[] durablePartBytes = (byte[]) durablePart.getObject(); isDurable = durablePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // region data policy byte[] regionDataPolicyPartBytes; boolean serializeValues = false; try { - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); regionDataPolicyPartBytes = (byte[]) regionDataPolicyPart.getObject(); - if (servConn.getClientVersion().compareTo(Version.GFE_80) >= 0) { + if (serverConnection.getClientVersion().compareTo(Version.GFE_80) >= 0) { // The second byte here is serializeValues serializeValues = regionDataPolicyPartBytes[1] == (byte) 0x01; } } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } partNumber = 3; - Part list = msg.getPart(partNumber); + Part list = clientMessage.getPart(partNumber); try { keys = (List) list.getObject(); numberOfKeys = keys.size(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean sendUpdatesAsInvalidates = false; try { - Part notifyPart = msg.getPart(partNumber + 1); + Part notifyPart = clientMessage.getPart(partNumber + 1); byte[] notifyPartBytes = (byte[]) notifyPart.getObject(); sendUpdatesAsInvalidates = notifyPartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug( "{}: Received register interest 66 request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, - regionName, keys); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), numberOfKeys, regionName, keys); } /* @@ -165,24 +165,25 @@ public class RegisterInterestList66 extends BaseCommand { LocalizedStrings.RegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_REGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeChunkedErrorResponse(msg, MessageType.REGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeChunkedErrorResponse(clientMessage, MessageType.REGISTER_INTEREST_DATA_ERROR, s, + serverConnection); + serverConnection.setAsTrue(RESPONDED); } // key not null - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { logger.info(LocalizedMessage.create( LocalizedStrings.RegisterInterestList_0_REGION_NAMED_1_WAS_NOT_FOUND_DURING_REGISTER_INTEREST_LIST_REQUEST, - new Object[] {servConn.getName(), regionName})); + new Object[] {serverConnection.getName(), regionName})); // writeChunkedErrorResponse(msg, // MessageType.REGISTER_INTEREST_DATA_ERROR, message); // responded = true; } // else { // region not null try { this.securityService.authorizeRegionRead(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegisterInterestOperationContext registerContext = @@ -191,15 +192,15 @@ public class RegisterInterestList66 extends BaseCommand { } } // Register interest - servConn.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, keys, - servConn.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, + serverConnection.getAcceptor().getCacheClientNotifier().registerClientInterest(regionName, + keys, serverConnection.getProxyID(), isDurable, sendUpdatesAsInvalidates, true, regionDataPolicyPartBytes[0], true); } catch (Exception ex) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ex); + checkForInterrupt(serverConnection, ex); // Otherwise, write an exception message and continue - writeChunkedException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ex, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -208,37 +209,37 @@ public class RegisterInterestList66 extends BaseCommand { // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - boolean isPrimary = servConn.getAcceptor().getCacheClientNotifier() - .getClientProxy(servConn.getProxyID()).isPrimary(); + boolean isPrimary = serverConnection.getAcceptor().getCacheClientNotifier() + .getClientProxy(serverConnection.getProxyID()).isPrimary(); if (!isPrimary) { chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_SECONDARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); chunkedResponseMsg.setLastChunk(true); if (logger.isDebugEnabled()) { logger.debug( "{}: Sending register interest response chunk from secondary for region: {} for key: {} chunk=<{}>", - servConn.getName(), regionName, key, chunkedResponseMsg); + serverConnection.getName(), regionName, key, chunkedResponseMsg); } - chunkedResponseMsg.sendChunk(servConn); + chunkedResponseMsg.sendChunk(serverConnection); } else { // isPrimary // Send header which describes how many chunks will follow chunkedResponseMsg.setMessageType(MessageType.RESPONSE_FROM_PRIMARY); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { fillAndSendRegisterInterestResponseChunks(region, keys, InterestType.KEY, serializeValues, - policy, servConn); - servConn.setAsTrue(RESPONDED); + policy, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // otherwise send the exception back to client - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -248,7 +249,7 @@ public class RegisterInterestList66 extends BaseCommand { // region " + regionName + " key " + key); logger.debug( "{}: Sent register interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java index 88386a1..a295c54 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java @@ -61,7 +61,7 @@ public class RemoveAll extends BaseCommand { protected RemoveAll() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long startp) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startp) throws IOException, InterruptedException { long start = startp; // copy this since we need to modify it Part regionNamePart = null, numberOfKeysPart = null, keyPart = null; @@ -73,11 +73,11 @@ public class RemoveAll extends BaseCommand { VersionedObjectList response = null; StringBuffer errMessage = new StringBuffer(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); @@ -87,7 +87,7 @@ public class RemoveAll extends BaseCommand { try { // Retrieve the data from the message parts // part 0: region name - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); if (regionName == null) { @@ -95,45 +95,47 @@ public class RemoveAll extends BaseCommand { LocalizedStrings.RemoveAll_THE_INPUT_REGION_NAME_FOR_THE_REMOVEALL_REQUEST_IS_NULL .toLocalizedString(); logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), txt})); + new Object[] {serverConnection.getName(), txt})); errMessage.append(txt); - writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, errMessage.toString(), + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during removeAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // part 1: eventID - eventPart = msg.getPart(1); + eventPart = clientMessage.getPart(1); ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = + new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); Breadcrumbs.setEventId(eventId); // part 2: flags - int flags = msg.getPart(2).getInt(); + int flags = clientMessage.getPart(2).getInt(); boolean clientIsEmpty = (flags & PutAllOp.FLAG_EMPTY) != 0; boolean clientHasCCEnabled = (flags & PutAllOp.FLAG_CONCURRENCY_CHECKS) != 0; // part 3: callbackArg - Object callbackArg = msg.getPart(3).getObject(); + Object callbackArg = clientMessage.getPart(3).getObject(); // part 4: number of keys - numberOfKeysPart = msg.getPart(4); + numberOfKeysPart = clientMessage.getPart(4); numberOfKeys = numberOfKeysPart.getInt(); if (logger.isDebugEnabled()) { StringBuilder buffer = new StringBuilder(); - buffer.append(servConn.getName()).append(": Received removeAll request from ") - .append(servConn.getSocketString()).append(" for region ").append(regionName) + buffer.append(serverConnection.getName()).append(": Received removeAll request from ") + .append(serverConnection.getSocketString()).append(" for region ").append(regionName) .append(callbackArg != null ? (" callbackArg " + callbackArg) : "").append(" with ") .append(numberOfKeys).append(" keys."); logger.debug(buffer); @@ -141,21 +143,21 @@ public class RemoveAll extends BaseCommand { ArrayList<Object> keys = new ArrayList<Object>(numberOfKeys); ArrayList<VersionTag> retryVersions = new ArrayList<VersionTag>(numberOfKeys); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(5 + i); + keyPart = clientMessage.getPart(5 + i); key = keyPart.getStringOrObject(); if (key == null) { String txt = LocalizedStrings.RemoveAll_ONE_OF_THE_INPUT_KEYS_FOR_THE_REMOVEALL_REQUEST_IS_NULL .toLocalizedString(); logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON, - new Object[] {servConn.getName(), txt})); + new Object[] {serverConnection.getName(), txt})); errMessage.append(txt); - writeChunkedErrorResponse(msg, MessageType.PUT_DATA_ERROR, errMessage.toString(), - servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedErrorResponse(clientMessage, MessageType.PUT_DATA_ERROR, + errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - if (msg.isRetry()) { + if (clientMessage.isRetry()) { // Constuct the thread id/sequence id information for this element of the bulk op // The sequence id is constructed from the base sequence id and the offset @@ -181,15 +183,16 @@ public class RemoveAll extends BaseCommand { keys.add(key); } // for - if (msg.getNumberOfParts() == (5 + numberOfKeys + 1)) {// it means optional timeout has been - // added - int timeout = msg.getPart(5 + numberOfKeys).getInt(); - servConn.setRequestSpecificTimeout(timeout); + if (clientMessage.getNumberOfParts() == (5 + numberOfKeys + 1)) {// it means optional timeout + // has been + // added + int timeout = clientMessage.getPart(5 + numberOfKeys).getInt(); + serverConnection.setRequestSpecificTimeout(timeout); } this.securityService.authorizeRegionWrite(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { authzRequest.createRegionAuthorize(regionName); @@ -200,8 +203,8 @@ public class RemoveAll extends BaseCommand { } } - response = region.basicBridgeRemoveAll(keys, retryVersions, servConn.getProxyID(), eventId, - callbackArg); + response = region.basicBridgeRemoveAll(keys, retryVersions, serverConnection.getProxyID(), + eventId, callbackArg); if (!region.getConcurrencyChecksEnabled() || clientIsEmpty || !clientHasCCEnabled) { // the client only needs this if versioning is being used and the client // has storage @@ -216,33 +219,34 @@ public class RemoveAll extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType()); + writeReplyWithRefreshMetadata(clientMessage, response, serverConnection, pr, + pr.getNetworkHopType()); pr.clearNetworkHopData(); replyWithMetaData = true; } } } catch (RegionDestroyedException rde) { - writeChunkedException(msg, rde, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, rde, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (ResourceException re) { - writeChunkedException(msg, re, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, re, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (PutAllPartialResultException pre) { - writeChunkedException(msg, pre, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, pre, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (Exception ce) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, ce); + checkForInterrupt(serverConnection, ce); // If an exception occurs during the op, preserve the connection - writeChunkedException(msg, ce, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, ce, serverConnection); + serverConnection.setAsTrue(RESPONDED); // if (logger.fineEnabled()) { logger.warn(LocalizedMessage.create(LocalizedStrings.Generic_0_UNEXPECTED_EXCEPTION, - servConn.getName()), ce); + serverConnection.getName()), ce); // } return; } finally { @@ -251,20 +255,21 @@ public class RemoveAll extends BaseCommand { stats.incProcessRemoveAllTime(start - oldStart); } if (logger.isDebugEnabled()) { - logger.debug("{}: Sending removeAll response back to {} for region {}{}", servConn.getName(), - servConn.getSocketString(), regionName, (logger.isTraceEnabled() ? ": " + response : "")); + logger.debug("{}: Sending removeAll response back to {} for region {}{}", + serverConnection.getName(), serverConnection.getSocketString(), regionName, + (logger.isTraceEnabled() ? ": " + response : "")); } // Increment statistics and write the reply if (!replyWithMetaData) { - writeReply(msg, response, servConn); + writeReply(clientMessage, response, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); stats.incWriteRemoveAllResponseTime(DistributionStats.getStatTime() - start); } @Override - protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException { + protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { throw new UnsupportedOperationException(); } @@ -285,7 +290,7 @@ public class RemoveAll extends BaseCommand { } replyMsg.sendHeader(); if (listSize > 0) { - int chunkSize = 2 * maximumChunkSize; + int chunkSize = 2 * MAXIMUM_CHUNK_SIZE; // Chunker will stream over the list in its toData method VersionedObjectList.Chunker chunk = new VersionedObjectList.Chunker(response, chunkSize, false, false); @@ -317,7 +322,7 @@ public class RemoveAll extends BaseCommand { } @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { throw new UnsupportedOperationException(); } @@ -345,7 +350,7 @@ public class RemoveAll extends BaseCommand { replyMsg.setLastChunk(false); replyMsg.sendChunk(servConn); - int chunkSize = 2 * maximumChunkSize; // maximumChunkSize + int chunkSize = 2 * MAXIMUM_CHUNK_SIZE; // MAXIMUM_CHUNK_SIZE // Chunker will stream over the list in its toData method VersionedObjectList.Chunker chunk = new VersionedObjectList.Chunker(response, chunkSize, false, false); @@ -371,7 +376,7 @@ public class RemoveAll extends BaseCommand { } pr.getPrStats().incPRMetaDataSentCount(); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), + logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(), origMsg.getTransactionId()); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java index 42a5bec..cc42e0d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveUserAuth.java @@ -33,9 +33,9 @@ public class RemoveUserAuth extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - boolean isSecureMode = msg.isSecureMode(); + boolean isSecureMode = clientMessage.isSecureMode(); if (!isSecureMode) { // need to throw exception @@ -43,29 +43,29 @@ public class RemoveUserAuth extends BaseCommand { } try { - servConn.setAsTrue(REQUIRES_RESPONSE); - Part keepalivePart = msg.getPart(0); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + Part keepalivePart = clientMessage.getPart(0); byte[] keepaliveByte = keepalivePart.getSerializedForm(); boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true; - servConn.getSecurityLogWriter().fine("remove user auth keep alive " + keepalive); - servConn.removeUserAuth(msg, keepalive); - writeReply(msg, servConn); + serverConnection.getSecurityLogWriter().fine("remove user auth keep alive " + keepalive); + serverConnection.removeUserAuth(clientMessage, keepalive); + writeReply(clientMessage, serverConnection); } catch (GemFireSecurityException gfse) { - if (servConn.getSecurityLogWriter().warningEnabled()) { - servConn.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, - servConn.getName() + ": Security exception: " + gfse.getMessage()); + if (serverConnection.getSecurityLogWriter().warningEnabled()) { + serverConnection.getSecurityLogWriter().warning(LocalizedStrings.ONE_ARG, + serverConnection.getName() + ": Security exception: " + gfse.getMessage()); } - writeException(msg, gfse, false, servConn); + writeException(clientMessage, gfse, false, serverConnection); } catch (Exception ex) { // TODO Auto-generated catch block - if (servConn.getLogWriter().warningEnabled()) { - servConn.getLogWriter().warning( + if (serverConnection.getLogWriter().warningEnabled()) { + serverConnection.getLogWriter().warning( LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, - new Object[] {servConn.getProxyID(), ""}, ex); + new Object[] {serverConnection.getProxyID(), ""}, ex); } - writeException(msg, ex, false, servConn); + writeException(clientMessage, ex, false, serverConnection); } finally { - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } }