http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java index 0ed7235..674082c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java @@ -64,7 +64,8 @@ public class ExecuteRegionFunction66 extends BaseCommand { private ExecuteRegionFunction66() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) + throws IOException { String regionName = null; Object function = null; Object args = null; @@ -80,7 +81,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { byte functionState = 0; int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT; try { - byte[] bytes = msg.getPart(0).getSerializedForm(); + byte[] bytes = clientMessage.getPart(0).getSerializedForm(); functionState = bytes[0]; if (bytes.length >= 5 && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { @@ -95,17 +96,17 @@ public class ExecuteRegionFunction66 extends BaseCommand { servConn.setAsTrue(REQUIRES_RESPONSE); servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } - regionName = msg.getPart(1).getString(); - function = msg.getPart(2).getStringOrObject(); - args = msg.getPart(3).getObject(); - Part part = msg.getPart(4); + regionName = clientMessage.getPart(1).getString(); + function = clientMessage.getPart(2).getStringOrObject(); + args = clientMessage.getPart(3).getObject(); + Part part = clientMessage.getPart(4); if (part != null) { Object obj = part.getObject(); if (obj instanceof MemberMappedArgument) { memberMappedArg = (MemberMappedArgument) obj; } } - byte[] flags = msg.getPart(5).getSerializedForm(); + byte[] flags = clientMessage.getPart(5).getSerializedForm(); if (servConn.getClientVersion().ordinal() > Version.GFE_81.ordinal()) { isBucketsAsFilter = (flags[0] & ExecuteFunctionHelper.BUCKETS_AS_FILTER_MASK) != 0; isReExecute = (flags[0] & ExecuteFunctionHelper.IS_REXECUTE_MASK) != 0 ? (byte) 1 : 0; @@ -113,24 +114,24 @@ public class ExecuteRegionFunction66 extends BaseCommand { isReExecute = flags[0]; isBucketsAsFilter = false; } - filterSize = msg.getPart(6).getInt(); + filterSize = clientMessage.getPart(6).getInt(); if (filterSize != 0) { filter = new HashSet<Object>(); partNumber = 7; for (int i = 0; i < filterSize; i++) { - filter.add(msg.getPart(partNumber + i).getStringOrObject()); + filter.add(clientMessage.getPart(partNumber + i).getStringOrObject()); } } partNumber = 7 + filterSize; - removedNodesSize = msg.getPart(partNumber).getInt(); + removedNodesSize = clientMessage.getPart(partNumber).getInt(); if (removedNodesSize != 0) { removedNodesSet = new HashSet<Object>(); partNumber = partNumber + 1; for (int i = 0; i < removedNodesSize; i++) { - removedNodesSet.add(msg.getPart(partNumber + i).getStringOrObject()); + removedNodesSet.add(clientMessage.getPart(partNumber + i).getStringOrObject()); } } @@ -139,9 +140,9 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(msg, exception, false, servConn); + writeChunkedException(clientMessage, exception, servConn); } else { - writeException(msg, exception, false, servConn); + writeException(clientMessage, exception, false, servConn); } servConn.setAsTrue(RESPONDED); return; @@ -159,7 +160,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { .toLocalizedString("region"); } logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } @@ -169,7 +170,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } HandShake handShake = (HandShake) servConn.getHandshake(); @@ -185,7 +186,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } else { byte functionStateOnServerSide = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -199,7 +200,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } } @@ -222,7 +223,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); ChunkedMessage m = servConn.getFunctionResponseMessage(); - m.setTransactionId(msg.getTransactionId()); + m.setTransactionId(clientMessage.getTransactionId()); resultSender = new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); @@ -276,7 +277,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { } else { execution.execute(functionObject); } - writeReply(msg, servConn); + writeReply(clientMessage, servConn); } } catch (IOException ioe) { logger.warn(LocalizedMessage.create( @@ -284,7 +285,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, msg, message, servConn, ioe); + sendException(hasResult, clientMessage, message, servConn, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); Object cause = fe.getCause(); @@ -321,7 +322,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, msg, message, servConn, fe); + sendException(hasResult, clientMessage, message, servConn, fe); } } catch (Exception e) { @@ -329,7 +330,7 @@ public class ExecuteRegionFunction66 extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, msg, message, servConn, e); + sendException(hasResult, clientMessage, message, servConn, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java index 8b2cf75..cf96137 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java @@ -62,7 +62,8 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { private ExecuteRegionFunctionSingleHop() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection servConn, long start) + throws IOException { String regionName = null; Object function = null; @@ -79,7 +80,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT; try { - byte[] bytes = msg.getPart(0).getSerializedForm(); + byte[] bytes = clientMessage.getPart(0).getSerializedForm(); functionState = bytes[0]; if (bytes.length >= 5 && servConn.getClientVersion().ordinal() >= Version.GFE_8009.ordinal()) { @@ -94,49 +95,49 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { servConn.setAsTrue(REQUIRES_RESPONSE); servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); } - regionName = msg.getPart(1).getString(); - function = msg.getPart(2).getStringOrObject(); - args = msg.getPart(3).getObject(); - Part part = msg.getPart(4); + regionName = clientMessage.getPart(1).getString(); + function = clientMessage.getPart(2).getStringOrObject(); + args = clientMessage.getPart(3).getObject(); + Part part = clientMessage.getPart(4); if (part != null) { Object obj = part.getObject(); if (obj instanceof MemberMappedArgument) { memberMappedArg = (MemberMappedArgument) obj; } } - isExecuteOnAllBuckets = msg.getPart(5).getSerializedForm()[0]; + isExecuteOnAllBuckets = clientMessage.getPart(5).getSerializedForm()[0]; if (isExecuteOnAllBuckets == 1) { filter = new HashSet(); - bucketIdsSize = msg.getPart(6).getInt(); + bucketIdsSize = clientMessage.getPart(6).getInt(); if (bucketIdsSize != 0) { buckets = new HashSet<Integer>(); partNumber = 7; for (int i = 0; i < bucketIdsSize; i++) { - buckets.add(msg.getPart(partNumber + i).getInt()); + buckets.add(clientMessage.getPart(partNumber + i).getInt()); } } partNumber = 7 + bucketIdsSize; } else { - filterSize = msg.getPart(6).getInt(); + filterSize = clientMessage.getPart(6).getInt(); if (filterSize != 0) { filter = new HashSet<Object>(); partNumber = 7; for (int i = 0; i < filterSize; i++) { - filter.add(msg.getPart(partNumber + i).getStringOrObject()); + filter.add(clientMessage.getPart(partNumber + i).getStringOrObject()); } } partNumber = 7 + filterSize; } - removedNodesSize = msg.getPart(partNumber).getInt(); + removedNodesSize = clientMessage.getPart(partNumber).getInt(); if (removedNodesSize != 0) { removedNodesSet = new HashSet<Object>(); partNumber = partNumber + 1; for (int i = 0; i < removedNodesSize; i++) { - removedNodesSet.add(msg.getPart(partNumber + i).getStringOrObject()); + removedNodesSet.add(clientMessage.getPart(partNumber + i).getStringOrObject()); } } @@ -145,7 +146,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), exception); if (hasResult == 1) { - writeChunkedException(msg, exception, false, servConn); + writeChunkedException(clientMessage, exception, servConn); servConn.setAsTrue(RESPONDED); return; } @@ -163,7 +164,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { .toLocalizedString("region"); } logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } @@ -173,7 +174,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_REGION_NAMED_0_WAS_NOT_FOUND_DURING_EXECUTE_FUNCTION_REQUEST .toLocalizedString(regionName); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } HandShake handShake = (HandShake) servConn.getHandshake(); @@ -189,7 +190,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.ExecuteRegionFunction_THE_FUNCTION_0_HAS_NOT_BEEN_REGISTERED .toLocalizedString(function); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } else { byte functionStateOnServer = AbstractExecution.getFunctionState(functionObject.isHA(), @@ -199,7 +200,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { LocalizedStrings.FunctionService_FUNCTION_ATTRIBUTE_MISMATCH_CLIENT_SERVER .toLocalizedString(function); logger.warn("{}: {}", servConn.getName(), message); - sendError(hasResult, msg, message, servConn); + sendError(hasResult, clientMessage, message, servConn); return; } } @@ -222,7 +223,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { // Construct execution AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(region); ChunkedMessage m = servConn.getFunctionResponseMessage(); - m.setTransactionId(msg.getTransactionId()); + m.setTransactionId(clientMessage.getTransactionId()); resultSender = new ServerToClientFunctionResultSender65(m, MessageType.EXECUTE_REGION_FUNCTION_RESULT, servConn, functionObject, executeContext); @@ -290,7 +291,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { function), ioe); final String message = LocalizedStrings.ExecuteRegionFunction_SERVER_COULD_NOT_SEND_THE_REPLY .toLocalizedString(); - sendException(hasResult, msg, message, servConn, ioe); + sendException(hasResult, clientMessage, message, servConn, ioe); } catch (FunctionException fe) { String message = fe.getMessage(); @@ -301,21 +302,21 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand { logger.debug("Exception on server while executing function: {}: {}", function, message, fe); } - synchronized (msg) { + synchronized (clientMessage) { resultSender.setException(fe); } } else { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), fe); - sendException(hasResult, msg, message, servConn, fe); + sendException(hasResult, clientMessage, message, servConn, fe); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ExecuteRegionFunction_EXCEPTION_ON_SERVER_WHILE_EXECUTIONG_FUNCTION_0, function), e); String message = e.getMessage(); - sendException(hasResult, msg, message, servConn, e); + sendException(hasResult, clientMessage, message, servConn, e); } finally { handShake.setClientReadTimeout(earlierClientReadTimeout); ServerConnection.executeFunctionOnLocalNodeOnly((byte) 0); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index d44a4ad..d489b88 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -79,14 +79,14 @@ public class GatewayReceiverCommand extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null; String regionName = null; Object callbackArg = null, key = null; int partNumber = 0; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - GatewayReceiverStats stats = (GatewayReceiverStats) servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + GatewayReceiverStats stats = (GatewayReceiverStats) serverConnection.getCacheServerStats(); EventID eventId = null; LocalRegion region = null; List<BatchException70> exceptions = new ArrayList<BatchException70>(); @@ -102,20 +102,20 @@ public class GatewayReceiverCommand extends BaseCommand { // statement so that all messages can take advantage of it. boolean earlyAck = false;// msg.getEarlyAck(); - stats.incBatchSize(msg.getPayloadLength()); + stats.incBatchSize(clientMessage.getPayloadLength()); // Retrieve the number of events - Part numberOfEventsPart = msg.getPart(0); + Part numberOfEventsPart = clientMessage.getPart(0); int numberOfEvents = numberOfEventsPart.getInt(); stats.incEventsReceived(numberOfEvents); // Retrieve the batch id - Part batchIdPart = msg.getPart(1); + Part batchIdPart = clientMessage.getPart(1); int batchId = batchIdPart.getInt(); // If this batch has already been seen, do not reply. // Instead, drop the batch and continue. - if (batchId <= servConn.getLatestBatchIdReplied()) { + if (batchId <= serverConnection.getLatestBatchIdReplied()) { if (GatewayReceiver.APPLY_RETRIES) { // Do nothing!!! logger.warn(LocalizedMessage.create( @@ -125,17 +125,17 @@ public class GatewayReceiverCommand extends BaseCommand { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_THAT_HAS_ALREADY_BEEN_OR_IS_BEING_PROCESSED__THIS_PROCESS_BATCH_REQUEST_IS_BEING_IGNORED, batchId)); - writeReply(msg, servConn, batchId, numberOfEvents); + writeReply(clientMessage, serverConnection, batchId, numberOfEvents); return; } stats.incDuplicateBatchesReceived(); } // Verify the batches arrive in order - if (batchId != servConn.getLatestBatchIdReplied() + 1) { + if (batchId != serverConnection.getLatestBatchIdReplied() + 1) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_RECEIVED_PROCESS_BATCH_REQUEST_0_OUT_OF_ORDER_THE_ID_OF_THE_LAST_BATCH_PROCESSED_WAS_1_THIS_BATCH_REQUEST_WILL_BE_PROCESSED_BUT_SOME_MESSAGES_MAY_HAVE_BEEN_LOST, - new Object[] {batchId, servConn.getLatestBatchIdReplied()})); + new Object[] {batchId, serverConnection.getLatestBatchIdReplied()})); stats.incOutoforderBatchesReceived(); } @@ -146,7 +146,7 @@ public class GatewayReceiverCommand extends BaseCommand { // If early ack mode, acknowledge right away // Not sure if earlyAck makes sense with sliding window if (earlyAck) { - servConn.incrementLatestBatchIdReplied(batchId); + serverConnection.incrementLatestBatchIdReplied(batchId); // writeReply(msg, servConn); // servConn.setAsTrue(RESPONDED); @@ -162,13 +162,13 @@ public class GatewayReceiverCommand extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received process batch request {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); if (earlyAck) { logger.debug( "{}: Sent process batch early response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); } } // logger.warn("Received process batch request " + batchId + " containing @@ -185,10 +185,10 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the events from the message parts. The '2' below // represents the number of events (part0) and the batchId (part1) partNumber = 2; - int dsid = msg.getPart(partNumber++).getInt(); + int dsid = clientMessage.getPart(partNumber++).getInt(); boolean removeOnException = - msg.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false; + clientMessage.getPart(partNumber++).getSerializedForm()[0] == 1 ? true : false; // Keep track of whether a response has been written for // exceptions @@ -202,7 +202,7 @@ public class GatewayReceiverCommand extends BaseCommand { indexWithoutPDXEvent++; // System.out.println("Processing event " + i + " in batch " + batchId + " // starting with part number " + partNumber); - Part actionTypePart = msg.getPart(partNumber); + Part actionTypePart = clientMessage.getPart(partNumber); int actionType = actionTypePart.getInt(); long versionTimeStamp = VersionTag.ILLEGAL_VERSION_TIMESTAMP; @@ -211,14 +211,14 @@ public class GatewayReceiverCommand extends BaseCommand { boolean callbackArgExists = false; try { - Part possibleDuplicatePart = msg.getPart(partNumber + 1); + Part possibleDuplicatePart = clientMessage.getPart(partNumber + 1); byte[] possibleDuplicatePartBytes; try { possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -231,7 +231,7 @@ public class GatewayReceiverCommand extends BaseCommand { callbackArg = null; // Retrieve the region name from the message parts - regionNamePart = msg.getPart(partNumber + 2); + regionNamePart = clientMessage.getPart(partNumber + 2); regionName = regionNamePart.getString(); if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) { indexWithoutPDXEvent--; @@ -243,28 +243,28 @@ public class GatewayReceiverCommand extends BaseCommand { // duplication of events, but it is unused now. In // fact the event id is overridden by the FROM_GATEWAY // token. - Part eventIdPart = msg.getPart(partNumber + 3); - eventIdPart.setVersion(servConn.getClientVersion()); + Part eventIdPart = clientMessage.getPart(partNumber + 3); + eventIdPart.setVersion(serverConnection.getClientVersion()); // String eventId = eventIdPart.getString(); try { eventId = (EventID) eventIdPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } // Retrieve the key from the message parts - keyPart = msg.getPart(partNumber + 4); + keyPart = clientMessage.getPart(partNumber + 4); try { key = keyPart.getStringOrObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -281,7 +281,7 @@ public class GatewayReceiverCommand extends BaseCommand { */ // Retrieve the value from the message parts (do not deserialize it) - valuePart = msg.getPart(partNumber + 5); + valuePart = clientMessage.getPart(partNumber + 5); // try { // logger.warn(getName() + ": Creating key " + key + " value " + // valuePart.getObject()); @@ -289,18 +289,18 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the callbackArg from the message parts if necessary int index = partNumber + 6; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -309,14 +309,15 @@ public class GatewayReceiverCommand extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch create request {} on {} for region {} key {} value {} callbackArg {}, eventId={}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, - valuePart, callbackArg, eventId); + serverConnection.getName(), batchId, serverConnection.getSocketString(), + regionName, key, valuePart, callbackArg, eventId); } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); // Process the create request if (key == null || regionName == null) { StringId message = null; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = + new Object[] {serverConnection.getName(), Integer.valueOf(batchId)}; if (key == null) { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_CREATE_REQUEST_1_IS_NULL; @@ -331,7 +332,7 @@ public class GatewayReceiverCommand extends BaseCommand { } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -348,7 +349,7 @@ public class GatewayReceiverCommand extends BaseCommand { boolean isObject = valuePart.isObject(); // [sumedh] This should be done on client while sending // since that is the WAN gateway - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg); @@ -361,29 +362,29 @@ public class GatewayReceiverCommand extends BaseCommand { result = addPdxType(crHelper, key, value); } else { result = region.basicBridgeCreate(key, value, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent, false); + serverConnection.getProxyID(), false, clientEvent, false); // If the create fails (presumably because it already exists), // attempt to update the entry if (!result) { result = region.basicBridgePut(key, value, null, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent); + serverConnection.getProxyID(), false, clientEvent); } } if (result || clientEvent.isConcurrencyConflict()) { - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incCreateRequest(); } else { // This exception will be logged in the catch block below throw new Exception( LocalizedStrings.ProcessBatch_0_FAILED_TO_CREATE_OR_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_CALLBACKARG_4 - .toLocalizedString(new Object[] {servConn.getName(), regionName, key, - valuePart, callbackArg})); + .toLocalizedString(new Object[] {serverConnection.getName(), regionName, + key, valuePart, callbackArg})); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_CREATE_REQUEST_1_FOR_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -400,7 +401,7 @@ public class GatewayReceiverCommand extends BaseCommand { */ // Retrieve the value from the message parts (do not deserialize it) - valuePart = msg.getPart(partNumber + 5); + valuePart = clientMessage.getPart(partNumber + 5); // try { // logger.warn(getName() + ": Updating key " + key + " value " + // valuePart.getObject()); @@ -408,34 +409,35 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the callbackArg from the message parts if necessary index = partNumber + 6; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch update request {} on {} for region {} key {} value {} callbackArg {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, - valuePart, callbackArg); + serverConnection.getName(), batchId, serverConnection.getSocketString(), + regionName, key, valuePart, callbackArg); } // Process the update request if (key == null || regionName == null) { StringId message = null; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = + new Object[] {serverConnection.getName(), Integer.valueOf(batchId)}; if (key == null) { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_KEY_FOR_THE_BATCH_UPDATE_REQUEST_1_IS_NULL; @@ -450,7 +452,7 @@ public class GatewayReceiverCommand extends BaseCommand { } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -465,7 +467,7 @@ public class GatewayReceiverCommand extends BaseCommand { try { byte[] value = valuePart.getSerializedForm(); boolean isObject = valuePart.isObject(); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { PutOperationContext putContext = authzRequest.putAuthorize(regionName, key, value, isObject, callbackArg, PutOperationContext.UPDATE); @@ -477,14 +479,14 @@ public class GatewayReceiverCommand extends BaseCommand { result = addPdxType(crHelper, key, value); } else { result = region.basicBridgePut(key, value, null, isObject, callbackArg, - servConn.getProxyID(), false, clientEvent); + serverConnection.getProxyID(), false, clientEvent); } if (result || clientEvent.isConcurrencyConflict()) { - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incUpdateRequest(); } else { - final Object[] msgArgs = - new Object[] {servConn.getName(), regionName, key, valuePart, callbackArg}; + final Object[] msgArgs = new Object[] {serverConnection.getName(), regionName, + key, valuePart, callbackArg}; final StringId message = LocalizedStrings.ProcessBatch_0_FAILED_TO_UPDATE_ENTRY_FOR_REGION_1_KEY_2_VALUE_3_AND_CALLBACKARG_4; String s = message.toLocalizedString(msgArgs); @@ -493,16 +495,17 @@ public class GatewayReceiverCommand extends BaseCommand { } } catch (CancelException e) { // FIXME better exception hierarchy would avoid this check - if (servConn.getCachedRegionHelper().getCache().getCancelCriterion() + if (serverConnection.getCachedRegionHelper().getCache().getCancelCriterion() .isCancelInProgress()) { if (logger.isDebugEnabled()) { logger.debug( "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", - servConn.getName(), MessageType.getString(msg.getMessageType()), - servConn.getProxyID()); + serverConnection.getName(), + MessageType.getString(clientMessage.getMessageType()), + serverConnection.getProxyID()); } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); } else { throw e; } @@ -511,7 +514,7 @@ public class GatewayReceiverCommand extends BaseCommand { // Preserve the connection under all circumstances logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; @@ -521,28 +524,29 @@ public class GatewayReceiverCommand extends BaseCommand { case 2: // Destroy // Retrieve the callbackArg from the message parts if necessary index = partNumber + 5; - callbackArgExistsPart = msg.getPart(index++); { + callbackArgExistsPart = clientMessage.getPart(index++); { byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; } if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_DESTROY_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug("{}: Processing batch destroy request {} on {} for region {} key {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key); + serverConnection.getName(), batchId, serverConnection.getSocketString(), + regionName, key); } // Process the destroy request @@ -556,14 +560,15 @@ public class GatewayReceiverCommand extends BaseCommand { message = LocalizedStrings.ProcessBatch_0_THE_INPUT_REGION_NAME_FOR_THE_BATCH_DESTROY_REQUEST_1_IS_NULL; } - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId)}; + Object[] messageArgs = + new Object[] {serverConnection.getName(), Integer.valueOf(batchId)}; String s = message.toLocalizedString(messageArgs); logger.warn(s); throw new Exception(s); } region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); if (versionTimeStamp > 0) { @@ -576,20 +581,20 @@ public class GatewayReceiverCommand extends BaseCommand { handleMessageRetry(region, clientEvent); // Destroy the entry try { - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { DestroyOperationContext destroyContext = authzRequest.destroyAuthorize(regionName, key, callbackArg); callbackArg = destroyContext.getCallbackArg(); } - region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), false, + region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), false, clientEvent); - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); stats.incDestroyRequest(); } catch (EntryNotFoundException e) { logger.info(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_DURING_BATCH_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] {serverConnection.getName(), key})); // throw new Exception(e); } } @@ -598,52 +603,52 @@ public class GatewayReceiverCommand extends BaseCommand { try { // Region name - regionNamePart = msg.getPart(partNumber + 2); + regionNamePart = clientMessage.getPart(partNumber + 2); regionName = regionNamePart.getString(); // Retrieve the event id from the message parts - eventIdPart = msg.getPart(partNumber + 3); + eventIdPart = clientMessage.getPart(partNumber + 3); eventId = (EventID) eventIdPart.getObject(); // Retrieve the key from the message parts - keyPart = msg.getPart(partNumber + 4); + keyPart = clientMessage.getPart(partNumber + 4); key = keyPart.getStringOrObject(); // Retrieve the callbackArg from the message parts if necessary index = partNumber + 5; - callbackArgExistsPart = msg.getPart(index++); + callbackArgExistsPart = clientMessage.getPart(index++); byte[] partBytes = (byte[]) callbackArgExistsPart.getObject(); callbackArgExists = partBytes[0] == 0x01; if (callbackArgExists) { - callbackArgPart = msg.getPart(index++); + callbackArgPart = clientMessage.getPart(index++); callbackArg = callbackArgPart.getObject(); } } catch (Exception e) { logger.warn(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS, - new Object[] {servConn.getName(), Integer.valueOf(batchId), + new Object[] {serverConnection.getName(), Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}), e); throw e; } - versionTimeStamp = msg.getPart(index++).getLong(); + versionTimeStamp = clientMessage.getPart(index++).getLong(); if (logger.isDebugEnabled()) { logger.debug( "{}: Processing batch update-version request {} on {} for region {} key {} value {} callbackArg {}", - servConn.getName(), batchId, servConn.getSocketString(), regionName, key, - valuePart, callbackArg); + serverConnection.getName(), batchId, serverConnection.getSocketString(), + regionName, key, valuePart, callbackArg); } // Process the update time-stamp request if (key == null || regionName == null) { StringId message = LocalizedStrings.ProcessBatch_0_CAUGHT_EXCEPTION_PROCESSING_BATCH_UPDATE_VERSION_REQUEST_1_CONTAINING_2_EVENTS; - Object[] messageArgs = new Object[] {servConn.getName(), Integer.valueOf(batchId), - Integer.valueOf(numberOfEvents)}; + Object[] messageArgs = new Object[] {serverConnection.getName(), + Integer.valueOf(batchId), Integer.valueOf(numberOfEvents)}; String s = message.toLocalizedString(messageArgs); logger.warn(s); throw new Exception(s); @@ -652,7 +657,7 @@ public class GatewayReceiverCommand extends BaseCommand { region = (LocalRegion) crHelper.getRegion(regionName); if (region == null) { - handleRegionNull(servConn, regionName, batchId); + handleRegionNull(serverConnection, regionName, batchId); } else { clientEvent = new EventIDHolder(eventId); @@ -668,13 +673,13 @@ public class GatewayReceiverCommand extends BaseCommand { // Update the version tag try { - region.basicBridgeUpdateVersionStamp(key, callbackArg, servConn.getProxyID(), - false, clientEvent); + region.basicBridgeUpdateVersionStamp(key, callbackArg, + serverConnection.getProxyID(), false, clientEvent); } catch (EntryNotFoundException e) { logger.info(LocalizedMessage.create( LocalizedStrings.ProcessBatch_0_DURING_BATCH_UPDATE_VERSION_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] {serverConnection.getName(), key})); // throw new Exception(e); } } @@ -684,29 +689,29 @@ public class GatewayReceiverCommand extends BaseCommand { default: logger.fatal(LocalizedMessage.create( LocalizedStrings.Processbatch_0_UNKNOWN_ACTION_TYPE_1_FOR_BATCH_FROM_2, - new Object[] {servConn.getName(), Integer.valueOf(actionType), - servConn.getSocketString()})); + new Object[] {serverConnection.getName(), Integer.valueOf(actionType), + serverConnection.getSocketString()})); stats.incUnknowsOperationsReceived(); } } catch (CancelException e) { if (logger.isDebugEnabled()) { logger.debug( "{} ignoring message of type {} from client {} because shutdown occurred during message processing.", - servConn.getName(), MessageType.getString(msg.getMessageType()), - servConn.getProxyID()); + serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()), + serverConnection.getProxyID()); } - servConn.setFlagProcessMessagesAsFalse(); - servConn.setClientDisconnectedException(e); + serverConnection.setFlagProcessMessagesAsFalse(); + serverConnection.setClientDisconnectedException(e); return; } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If we have an issue with the PDX registry, stop processing more data if (e.getCause() instanceof PdxRegistryMismatchException) { fatalException = e.getCause(); logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_PDX_CONFIGURATION, - new Object[] {servConn.getMembershipID()}), e.getCause()); + new Object[] {serverConnection.getMembershipID()}), e.getCause()); break; } @@ -772,26 +777,26 @@ public class GatewayReceiverCommand extends BaseCommand { stats.incProcessBatchTime(start - oldStart); } if (fatalException != null) { - servConn.incrementLatestBatchIdReplied(batchId); - writeFatalException(msg, fatalException, servConn, batchId); - servConn.setAsTrue(RESPONDED); + serverConnection.incrementLatestBatchIdReplied(batchId); + writeFatalException(clientMessage, fatalException, serverConnection, batchId); + serverConnection.setAsTrue(RESPONDED); } else if (!exceptions.isEmpty()) { - servConn.incrementLatestBatchIdReplied(batchId); - writeBatchException(msg, exceptions, servConn, batchId); - servConn.setAsTrue(RESPONDED); + serverConnection.incrementLatestBatchIdReplied(batchId); + writeBatchException(clientMessage, exceptions, serverConnection, batchId); + serverConnection.setAsTrue(RESPONDED); } else if (!wroteResponse) { // Increment the batch id unless the received batch id is -1 (a failover // batch) - servConn.incrementLatestBatchIdReplied(batchId); + serverConnection.incrementLatestBatchIdReplied(batchId); - writeReply(msg, servConn, batchId, numberOfEvents); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection, batchId, numberOfEvents); + serverConnection.setAsTrue(RESPONDED); stats.incWriteProcessBatchResponseTime(DistributionStats.getStatTime() - start); if (logger.isDebugEnabled()) { logger.debug( "{}: Sent process batch normal response for batch {} containing {} events ({} bytes) with {} acknowledgement on {}", - servConn.getName(), batchId, numberOfEvents, msg.getPayloadLength(), - (earlyAck ? "early" : "normal"), servConn.getSocketString()); + serverConnection.getName(), batchId, numberOfEvents, clientMessage.getPayloadLength(), + (earlyAck ? "early" : "normal"), serverConnection.getSocketString()); } // logger.warn("Sent process batch normal response for batch " + // batchId + " containing " + numberOfEvents + " events (" + http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java index 5cb1e41..2ca8804 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java @@ -54,17 +54,17 @@ public class Get70 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long startparam) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam) throws IOException { long start = startparam; Part regionNamePart = null, keyPart = null, valuePart = null; String regionName = null; Object callbackArg = null, key = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // requiresResponse = true; { long oldStart = start; @@ -72,18 +72,18 @@ public class Get70 extends BaseCommand { stats.incReadGetRequestTime(start - oldStart); } // Retrieve the data from the message parts - int parts = msg.getNumberOfParts(); - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + int parts = clientMessage.getNumberOfParts(); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); // valuePart = null; (redundant assignment) if (parts > 2) { - valuePart = msg.getPart(2); + valuePart = clientMessage.getPart(2); try { callbackArg = valuePart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -91,15 +91,15 @@ public class Get70 extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); // responded = true; - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received 7.0 get request ({} bytes) from {} for region {} key {} txId {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key, - msg.getTransactionId()); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key, clientMessage.getTransactionId()); } // Process the get request @@ -113,18 +113,18 @@ public class Get70 extends BaseCommand { errMessage = LocalizedStrings.Request_THE_INPUT_REGION_NAME_FOR_THE_GET_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.REQUESTDATAERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.REQUESTDATAERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - Region region = servConn.getCache().getRegion(regionName); + Region region = serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Request__0_WAS_NOT_FOUND_DURING_GET_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -133,14 +133,14 @@ public class Get70 extends BaseCommand { // for integrated security this.securityService.authorizeRegionRead(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { getContext = authzRequest.getAuthorize(regionName, key, callbackArg); callbackArg = getContext.getCallbackArg(); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -148,10 +148,10 @@ public class Get70 extends BaseCommand { // the value if it is a byte[]. Entry entry; try { - entry = getEntry(region, key, callbackArg, servConn); + entry = getEntry(region, key, callbackArg, serverConnection); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -164,7 +164,7 @@ public class Get70 extends BaseCommand { boolean keyNotPresent = entry.keyNotPresent; try { - AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); + AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest(); if (postAuthzRequest != null) { try { getContext = postAuthzRequest.getAuthorize(regionName, key, data, isObject, getContext); @@ -182,8 +182,8 @@ public class Get70 extends BaseCommand { } } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -197,23 +197,25 @@ public class Get70 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject, servConn, pr, - pr.getNetworkHopType(), versionTag, keyNotPresent); + writeResponseWithRefreshMetadata(data, callbackArg, clientMessage, isObject, + serverConnection, pr, pr.getNetworkHopType(), versionTag, keyNotPresent); pr.clearNetworkHopData(); } else { - writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent, + serverConnection); } } else { - writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn); + writeResponse(data, callbackArg, clientMessage, isObject, versionTag, keyNotPresent, + serverConnection); } } finally { OffHeapHelper.release(originalData); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Wrote get response back to {} for region {} {}", servConn.getName(), - servConn.getSocketString(), regionName, entry); + logger.debug("{}: Wrote get response back to {} for region {} {}", serverConnection.getName(), + serverConnection.getSocketString(), regionName, entry); } stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start); @@ -379,12 +381,12 @@ public class Get70 extends BaseCommand { } @Override - protected void writeReply(Message origMsg, ServerConnection servConn) throws IOException { + protected void writeReply(Message origMsg, ServerConnection serverConnection) throws IOException { throw new UnsupportedOperationException(); } @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java index 22e63c6..01c5c9c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll.java @@ -44,33 +44,34 @@ public class GetAll extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the region name from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(1); + keysPart = clientMessage.getPart(1); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ") + .append(serverConnection.getSocketString()).append(" for region ").append(regionName) + .append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -91,37 +92,38 @@ public class GetAll extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -142,14 +144,14 @@ public class GetAll extends BaseCommand { numKeys = allKeys.size(); } - ObjectPartList values = new ObjectPartList(maximumChunkSize, keys == null); + ObjectPartList values = new ObjectPartList(MAXIMUM_CHUNK_SIZE, keys == null); AuthorizeRequest authzRequest = servConn.getAuthzRequest(); AuthorizeRequestPP postAuthzRequest = servConn.getPostAuthzRequest(); Request request = (Request) Request.getCommand(); Object[] valueAndIsObject = new Object[3]; for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendGetAllResponseChunk(region, values, false, servConn); values.clear(); @@ -246,7 +248,7 @@ public class GetAll extends BaseCommand { ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); + chunkedResponseMsg.addObjPart(list, false); if (logger.isDebugEnabled()) { logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>", http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java index a19d540..ad8ef49 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll651.java @@ -21,7 +21,6 @@ import java.util.Set; import org.apache.geode.cache.Region; import org.apache.geode.cache.operations.GetOperationContext; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -45,33 +44,34 @@ public class GetAll651 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the region name from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(1); + keysPart = clientMessage.getPart(1); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ") + .append(serverConnection.getSocketString()).append(" for region ").append(regionName) + .append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -90,37 +90,38 @@ public class GetAll651 extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -148,7 +149,7 @@ public class GetAll651 extends BaseCommand { final boolean isDebugEnabled = logger.isDebugEnabled(); for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list sendGetAllResponseChunk(region, values, false, servConn); values.clear(); @@ -253,7 +254,7 @@ public class GetAll651 extends BaseCommand { * @param includeKeys if the part list should include the keys */ protected ObjectPartList651 getObjectPartsList(boolean includeKeys) { - ObjectPartList651 values = new ObjectPartList651(maximumChunkSize, includeKeys); + ObjectPartList651 values = new ObjectPartList651(MAXIMUM_CHUNK_SIZE, includeKeys); return values; } @@ -262,7 +263,7 @@ public class GetAll651 extends BaseCommand { ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); + chunkedResponseMsg.addObjPart(list, false); if (logger.isDebugEnabled()) { logger.debug("{}: Sending {} getAll response chunk for region={} values={} chunk=<{}>", http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java index 154e800..267a5b2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAll70.java @@ -23,7 +23,6 @@ import org.apache.geode.cache.operations.GetOperationContext; import org.apache.geode.cache.operations.internal.GetOperationContextImpl; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -40,7 +39,6 @@ import org.apache.geode.internal.offheap.OffHeapHelper; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.security.AuthorizeRequest; import org.apache.geode.internal.security.AuthorizeRequestPP; -import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.NotAuthorizedException; public class GetAll70 extends BaseCommand { @@ -52,36 +50,37 @@ public class GetAll70 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, keysPart = null; String regionName = null; Object[] keys = null; - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); int partIdx = 0; // Retrieve the region name from the message parts - regionNamePart = msg.getPart(partIdx++); + regionNamePart = clientMessage.getPart(partIdx++); regionName = regionNamePart.getString(); // Retrieve the keys array from the message parts - keysPart = msg.getPart(partIdx++); + keysPart = clientMessage.getPart(partIdx++); try { keys = (Object[]) keysPart.getObject(); } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean requestSerializedValues; - requestSerializedValues = msg.getPart(partIdx++).getInt() == 1; + requestSerializedValues = clientMessage.getPart(partIdx++).getInt() == 1; if (logger.isDebugEnabled()) { StringBuffer buffer = new StringBuffer(); - buffer.append(servConn.getName()).append(": Received getAll request (") - .append(msg.getPayloadLength()).append(" bytes) from ").append(servConn.getSocketString()) - .append(" for region ").append(regionName).append(" keys "); + buffer.append(serverConnection.getName()).append(": Received getAll request (") + .append(clientMessage.getPayloadLength()).append(" bytes) from ") + .append(serverConnection.getSocketString()).append(" for region ").append(regionName) + .append(" keys "); if (keys != null) { for (int i = 0; i < keys.length; i++) { buffer.append(keys[i]).append(" "); @@ -100,37 +99,39 @@ public class GetAll70 extends BaseCommand { message = LocalizedStrings.GetAll_THE_INPUT_REGION_NAME_FOR_THE_GETALL_REQUEST_IS_NULL .toLocalizedString(); } - logger.warn("{}: {}", servConn.getName(), message); - writeChunkedErrorResponse(msg, MessageType.GET_ALL_DATA_ERROR, message, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), message); + writeChunkedErrorResponse(clientMessage, MessageType.GET_ALL_DATA_ERROR, message, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during getAll request"; - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Send header - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); // Send chunk response try { - fillAndSendGetAllResponseChunks(region, regionName, keys, servConn, requestSerializedValues); - servConn.setAsTrue(RESPONDED); + fillAndSendGetAllResponseChunks(region, regionName, keys, serverConnection, + requestSerializedValues); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // Otherwise, write an exception message and continue - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -163,7 +164,7 @@ public class GetAll70 extends BaseCommand { // in the old mode (which may be impossible since we only used that mode pre 7.0) in which the // client told us // to get and return all the keys and values. I think this was used for register interest. - VersionedObjectList values = new VersionedObjectList(maximumChunkSize, keys == null, + VersionedObjectList values = new VersionedObjectList(MAXIMUM_CHUNK_SIZE, keys == null, region.getAttributes().getConcurrencyChecksEnabled(), requestSerializedValues); try { AuthorizeRequest authzRequest = servConn.getAuthzRequest(); @@ -172,7 +173,7 @@ public class GetAll70 extends BaseCommand { final boolean isDebugEnabled = logger.isDebugEnabled(); for (int i = 0; i < numKeys; i++) { // Send the intermediate chunk if necessary - if (values.size() == maximumChunkSize) { + if (values.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list values.setKeys(null); sendGetAllResponseChunk(region, values, false, servConn); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java index d380beb..43d3348 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetAllForRI.java @@ -38,7 +38,7 @@ public class GetAllForRI extends GetAll651 { @Override protected ObjectPartList651 getObjectPartsList(boolean includeKeys) { - return new SerializedObjectPartList(maximumChunkSize, includeKeys); + return new SerializedObjectPartList(MAXIMUM_CHUNK_SIZE, includeKeys); }