http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java index a579170..cd12ea7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RollbackCommand.java @@ -39,18 +39,18 @@ public class RollbackCommand extends BaseCommand { private RollbackCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); - TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); TXId txId = new TXId(client, uniqId); if (txMgr.isHostedTxRecentlyCompleted(txId)) { if (logger.isDebugEnabled()) { logger.debug("TX: found a recently rolled back tx: {}", txId); - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); txMgr.removeHostedTXState(txId); return; } @@ -60,16 +60,16 @@ public class RollbackCommand extends BaseCommand { if (txState != null) { txId = txState.getTxId(); txMgr.rollback(); - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); } else { // could not find TxState in the host server. // Protect against a failover command received so late, // and it is removed from the failoverMap due to capacity. - sendRollbackReply(msg, servConn); + sendRollbackReply(clientMessage, serverConnection); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } finally { if (logger.isDebugEnabled()) { logger.debug("TX: removing tx state for {}", txId);
http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java index c78f4d9..42e14a3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Size.java @@ -56,18 +56,18 @@ public class Size extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { StringBuilder errMessage = new StringBuilder(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadSizeRequestTime(start - oldStart); // Retrieve the data from the message parts - Part regionNamePart = msg.getPart(0); + Part regionNamePart = clientMessage.getPart(0); String regionName = regionNamePart.getString(); if (regionName == null) { @@ -76,8 +76,8 @@ public class Size extends BaseCommand { errMessage .append(LocalizedStrings.BaseCommand__THE_INPUT_REGION_NAME_FOR_THE_0_REQUEST_IS_NULL .toLocalizedString("size")); - writeErrorResponse(msg, MessageType.SIZE_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.SIZE_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -85,38 +85,38 @@ public class Size extends BaseCommand { if (region == null) { String reason = LocalizedStrings.BaseCommand__0_WAS_NOT_FOUND_DURING_1_REQUEST .toLocalizedString(regionName, "size"); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } // Size the entry try { this.securityService.authorizeRegionRead(regionName); - writeSizeResponse(region.size(), msg, servConn); + writeSizeResponse(region.size(), clientMessage, serverConnection); } catch (RegionDestroyedException rde) { - writeException(msg, rde, false, servConn); + writeException(clientMessage, rde, false, serverConnection); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the destroy, preserve the connection - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); if (e instanceof GemFireSecurityException) { // Fine logging for security exceptions since these are already // logged by the security logger if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected Security exception", servConn.getName(), e); + logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e); } } else { logger.warn(LocalizedMessage.create(LocalizedStrings.BaseCommand_0_UNEXPECTED_EXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } } finally { if (logger.isDebugEnabled()) { - logger.debug("{}: Sent size response for region {}", servConn.getName(), regionName); + logger.debug("{}: Sent size response for region {}", serverConnection.getName(), regionName); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); stats.incWriteSizeResponseTime(DistributionStats.getStatTime() - start); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java index 72eab50..9fc3fd1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java @@ -49,23 +49,23 @@ public class TXFailoverCommand extends BaseCommand { private TXFailoverCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // Build the TXId for the transaction InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); if (logger.isDebugEnabled()) { logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client); } TXId txId = new TXId(client, uniqId); - TXManagerImpl mgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another // thread if (mgr.isHostedTxRecentlyCompleted(txId)) { - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); mgr.removeHostedTXState(txId); return; } @@ -75,7 +75,7 @@ public class TXFailoverCommand extends BaseCommand { if (!tx.isRealDealLocal()) { // send message to all peers to find out who hosts the transaction FindRemoteTXMessageReplyProcessor processor = - FindRemoteTXMessage.send(servConn.getCache(), txId); + FindRemoteTXMessage.send(serverConnection.getCache(), txId); try { processor.waitForRepliesUninterruptibly(); } catch (ReplyException e) { @@ -96,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand { // bug #42228 and bug #43504 - this cannot return until the current view // has been installed by all members, so that dlocks are released and // the same keys can be used in a new transaction by the same client thread - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); try { WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager()); } catch (InterruptedException e) { @@ -110,9 +110,9 @@ public class TXFailoverCommand extends BaseCommand { } mgr.saveTXCommitMessageForClientFailover(txId, processor.getTxCommitMessage()); } else { - writeException(msg, new TransactionDataNodeHasDepartedException( - "Could not find transaction host for " + txId), false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, new TransactionDataNodeHasDepartedException( + "Could not find transaction host for " + txId), false, serverConnection); + serverConnection.setAsTrue(RESPONDED); mgr.removeHostedTXState(txId); return; } @@ -121,8 +121,8 @@ public class TXFailoverCommand extends BaseCommand { if (!wasInProgress) { mgr.setInProgress(false); } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java index 8cedd2c..c5b9fc5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java @@ -15,7 +15,6 @@ package org.apache.geode.internal.cache.tier.sockets.command; -import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ReplyException; @@ -54,7 +53,7 @@ public class TXSynchronizationCommand extends BaseCommand { * org.apache.geode.internal.cache.tier.sockets.ServerConnection) */ @Override - protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) { + protected boolean shouldMasqueradeForTx(Message clientMessage, ServerConnection serverConnection) { // masquerading is done in the waiting thread pool return false; } @@ -68,26 +67,26 @@ public class TXSynchronizationCommand extends BaseCommand { * long) */ @Override - public void cmdExecute(final Message msg, final ServerConnection servConn, long start) + public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException, InterruptedException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); - CompletionType type = CompletionType.values()[msg.getPart(0).getInt()]; - /* int txIdInt = */ msg.getPart(1).getInt(); // [bruce] not sure if we need to transmit this + CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()]; + /* int txIdInt = */ clientMessage.getPart(1).getInt(); // [bruce] not sure if we need to transmit this final Part statusPart; if (type == CompletionType.AFTER_COMPLETION) { - statusPart = msg.getPart(2); + statusPart = clientMessage.getPart(2); } else { statusPart = null; } - final TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + final TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); final InternalDistributedMember member = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); // get the tx state without associating it with this thread. That's done later - final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true); + final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true); // we have to run beforeCompletion and afterCompletion in the same thread // because beforeCompletion obtains locks for the thread and afterCompletion @@ -102,21 +101,21 @@ public class TXSynchronizationCommand extends BaseCommand { TXStateProxy txState = null; Throwable failureException = null; try { - txState = txMgr.masqueradeAs(msg, member, false); + txState = txMgr.masqueradeAs(clientMessage, member, false); if (isDebugEnabled) { logger.debug("Executing beforeCompletion() notification for transaction {}", - msg.getTransactionId()); + clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.beforeCompletion(); try { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } catch (IOException e) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", e); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (ReplyException e) { failureException = e.getCause(); } catch (InterruptedException e) { @@ -128,13 +127,13 @@ public class TXSynchronizationCommand extends BaseCommand { } if (failureException != null) { try { - writeException(msg, failureException, false, servConn); + writeException(clientMessage, failureException, false, serverConnection); } catch (IOException ioe) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", ioe); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } }; @@ -150,11 +149,11 @@ public class TXSynchronizationCommand extends BaseCommand { public void run() { TXStateProxy txState = null; try { - txState = txMgr.masqueradeAs(msg, member, false); + txState = txMgr.masqueradeAs(clientMessage, member, false); int status = statusPart.getInt(); if (isDebugEnabled) { logger.debug("Executing afterCompletion({}) notification for transaction {}", - status, msg.getTransactionId()); + status, clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.afterCompletion(status); @@ -162,7 +161,7 @@ public class TXSynchronizationCommand extends BaseCommand { // where it can be applied to the local cache TXCommitMessage cmsg = txState.getCommitMessage(); try { - CommitCommand.writeCommitResponse(cmsg, msg, servConn); + CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection); txMgr.removeHostedTXState(txState.getTxId()); } catch (IOException e) { // not much can be done here @@ -170,16 +169,16 @@ public class TXSynchronizationCommand extends BaseCommand { logger.warn("Problem writing reply to client", e); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (RuntimeException e) { try { - writeException(msg, e, false, servConn); + writeException(clientMessage, e, false, serverConnection); } catch (IOException ioe) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", ioe); } } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { @@ -195,12 +194,12 @@ public class TXSynchronizationCommand extends BaseCommand { sync.runSecondRunnable(afterCompletion); } else { if (statusPart.getInt() == Status.STATUS_COMMITTED) { - TXStateProxy txState = txMgr.masqueradeAs(msg, member, false); + TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false); try { if (isDebugEnabled) { logger.debug( "Executing beforeCompletion() notification for transaction {} after failover", - msg.getTransactionId()); + clientMessage.getTransactionId()); } txState.setIsJTA(true); txState.beforeCompletion(); @@ -212,8 +211,8 @@ public class TXSynchronizationCommand extends BaseCommand { } } } catch (Exception e) { - writeException(msg, MessageType.EXCEPTION, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); } if (isDebugEnabled) { logger.debug("Sent tx synchronization response"); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java index 7dbb78f..597f92b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterest.java @@ -45,43 +45,43 @@ public class UnregisterInterest extends BaseCommand { UnregisterInterest() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws ClassNotFoundException, IOException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; int interestType = 0; StringId errMessage = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); - regionNamePart = msg.getPart(0); - interestType = msg.getPart(1).getInt(); - keyPart = msg.getPart(2); - Part isClosingPart = msg.getPart(3); + regionNamePart = clientMessage.getPart(0); + interestType = clientMessage.getPart(1).getInt(); + keyPart = clientMessage.getPart(2); + Part isClosingPart = clientMessage.getPart(3); byte[] isClosingPartBytes = (byte[]) isClosingPart.getObject(); boolean isClosing = isClosingPartBytes[0] == 0x01; regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } boolean keepalive = false; try { - Part keepalivePart = msg.getPart(4); + Part keepalivePart = clientMessage.getPart(4); byte[] keepaliveBytes = (byte[]) keepalivePart.getObject(); keepalive = keepaliveBytes[0] != 0x00; } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug( "{}: Received unregister interest request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // Process the unregister interest request @@ -95,9 +95,9 @@ public class UnregisterInterest extends BaseCommand { errMessage = LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL; String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -108,12 +108,12 @@ public class UnregisterInterest extends BaseCommand { this.securityService.authorizeRegionRead(regionName, key.toString()); } } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { try { @@ -121,8 +121,8 @@ public class UnregisterInterest extends BaseCommand { authzRequest.unregisterInterestAuthorize(regionName, key, interestType); key = unregisterContext.getKey(); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -141,17 +141,17 @@ public class UnregisterInterest extends BaseCommand { */ // Unregister interest irrelevent of whether the region is present it or // not - servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key, - interestType, isClosing, servConn.getProxyID(), keepalive); + serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, key, + interestType, isClosing, serverConnection.getProxyID(), keepalive); // Update the statistics and write the reply // bserverStats.incLong(processDestroyTimeId, // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent unregister interest response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent unregister interest response for region {} key {}", serverConnection.getName(), regionName, key); } // bserverStats.incLong(writeDestroyResponseTimeId, http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java index 7369587..76cbba2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UnregisterInterestList.java @@ -46,48 +46,48 @@ public class UnregisterInterestList extends BaseCommand { private UnregisterInterestList() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { Part regionNamePart = null, keyPart = null, numberOfKeysPart = null; String regionName = null; Object key = null; List keys = null; int numberOfKeys = 0, partNumber = 0; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); + regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getString(); - Part isClosingListPart = msg.getPart(1); + Part isClosingListPart = clientMessage.getPart(1); byte[] isClosingListPartBytes = (byte[]) isClosingListPart.getObject(); boolean isClosingList = isClosingListPartBytes[0] == 0x01; boolean keepalive = false; try { - Part keepalivePart = msg.getPart(2); + Part keepalivePart = clientMessage.getPart(2); byte[] keepalivePartBytes = (byte[]) keepalivePart.getObject(); keepalive = keepalivePartBytes[0] == 0x01; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - numberOfKeysPart = msg.getPart(3); + numberOfKeysPart = clientMessage.getPart(3); numberOfKeys = numberOfKeysPart.getInt(); partNumber = 4; keys = new ArrayList(); for (int i = 0; i < numberOfKeys; i++) { - keyPart = msg.getPart(partNumber + i); + keyPart = clientMessage.getPart(partNumber + i); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } keys.add(key); @@ -95,7 +95,7 @@ public class UnregisterInterestList extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug( "{}: Received unregister interest request ({} bytes) from {} for the following {} keys in region {}: {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), numberOfKeys, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), numberOfKeys, regionName, keys); } @@ -113,22 +113,22 @@ public class UnregisterInterestList extends BaseCommand { LocalizedStrings.UnRegisterInterest_THE_INPUT_REGION_NAME_FOR_THE_UNREGISTER_INTEREST_REQUEST_IS_NULL; } String s = errMessage.toLocalizedString(); - logger.warn("{}: {}", servConn.getName(), s); - writeErrorResponse(msg, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, servConn); - servConn.setAsTrue(RESPONDED); + logger.warn("{}: {}", serverConnection.getName(), s); + writeErrorResponse(clientMessage, MessageType.UNREGISTER_INTEREST_DATA_ERROR, s, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (!DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { try { @@ -136,8 +136,8 @@ public class UnregisterInterestList extends BaseCommand { authzRequest.unregisterInterestListAuthorize(regionName, keys); keys = (List) unregisterContext.getKey(); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -155,20 +155,20 @@ public class UnregisterInterestList extends BaseCommand { * responded = true; } else { */ // Register interest - servConn.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys, - isClosingList, servConn.getProxyID(), keepalive); + serverConnection.getAcceptor().getCacheClientNotifier().unregisterClientInterest(regionName, keys, + isClosingList, serverConnection.getProxyID(), keepalive); // Update the statistics and write the reply // bserverStats.incLong(processDestroyTimeId, // DistributionStats.getStatTime() - start); // start = DistributionStats.getStatTime(); WHY ARE GETTING START AND NOT // USING IT? - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { logger.debug( "{}: Sent unregister interest response for the following {} keys in region {}: {}", - servConn.getName(), numberOfKeys, regionName, keys); + serverConnection.getName(), numberOfKeys, regionName, keys); } // bserverStats.incLong(writeDestroyResponseTimeId, // DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java index 57aca22..b870a96 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/UpdateClientNotification.java @@ -35,8 +35,8 @@ public class UpdateClientNotification extends BaseCommand { private UpdateClientNotification() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); { long oldStart = start; start = DistributionStats.getStatTime(); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java index ac9b5da..72719b2 100644 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseCQ.java @@ -44,30 +44,30 @@ public class CloseCQ extends BaseCQCommand { private CloseCQ() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - ClientProxyMembershipID id = servConn.getProxyID(); - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + ClientProxyMembershipID id = serverConnection.getProxyID(); + CacheServerStats stats = serverConnection.getCacheServerStats(); // Based on MessageType.QUERY // Added by Rao 2/1/2007 - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - String cqName = msg.getPart(0).getString(); + String cqName = clientMessage.getPart(0).getString(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(), - servConn.getSocketString(), cqName); + logger.debug("{}: Received close CQ request from {} cqName: {}", serverConnection.getName(), + serverConnection.getSocketString(), cqName); } // Process the query request if (cqName == null) { String err = LocalizedStrings.CloseCQ_THE_CQNAME_FOR_THE_CQ_CLOSE_REQUEST_IS_NULL.toLocalizedString(); - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } @@ -85,7 +85,7 @@ public class CloseCQ extends BaseCQCommand { } InternalCqQuery cqQuery = cqService.getCq(serverCqName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { String queryStr = null; Set cqRegionNames = null; @@ -102,22 +102,22 @@ public class CloseCQ extends BaseCQCommand { // getMembershipID()); cqService.closeCq(cqName, id); if (cqQuery != null) - servConn.removeCq(cqName, cqQuery.isDurable()); + serverConnection.removeCq(cqName, cqQuery.isDurable()); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); return; } catch (Exception e) { String err = LocalizedStrings.CloseCQ_EXCEPTION_WHILE_CLOSING_CQ_CQNAME_0.toLocalizedString(cqName); - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection); return; } // Send OK to client sendCqResponse(MessageType.REPLY, - LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), msg.getTransactionId(), - null, servConn); - servConn.setAsTrue(RESPONDED); + LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), clientMessage.getTransactionId(), + null, serverConnection); + serverConnection.setAsTrue(RESPONDED); { long oldStart = start; http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java index 9bddbc7..d2a4453 100644 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ.java @@ -52,27 +52,27 @@ public class ExecuteCQ extends BaseCQCommand { private ExecuteCQ() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { - AcceptorImpl acceptor = servConn.getAcceptor(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - ClientProxyMembershipID id = servConn.getProxyID(); - CacheServerStats stats = servConn.getCacheServerStats(); + AcceptorImpl acceptor = serverConnection.getAcceptor(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + ClientProxyMembershipID id = serverConnection.getProxyID(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the data from the message parts - String cqName = msg.getPart(0).getString(); - String cqQueryString = msg.getPart(1).getString(); - int cqState = msg.getPart(2).getInt(); + String cqName = clientMessage.getPart(0).getString(); + String cqQueryString = clientMessage.getPart(1).getString(); + int cqState = clientMessage.getPart(2).getInt(); - Part isDurablePart = msg.getPart(3); + Part isDurablePart = clientMessage.getPart(3); byte[] isDurableByte = isDurablePart.getSerializedForm(); boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false : true; if (logger.isDebugEnabled()) { - logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(), - MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName, + logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", serverConnection.getName(), + MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString(), cqName, cqQueryString); } @@ -87,7 +87,7 @@ public class ExecuteCQ extends BaseCQCommand { qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService(); // Authorization check - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { query = qService.newQuery(cqQueryString); cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null); @@ -108,10 +108,10 @@ public class ExecuteCQ extends BaseCQCommand { cqQuery = cqServiceForExec.executeCq(cqName, cqQueryString, cqState, id, acceptor.getCacheClientNotifier(), isDurable, false, 0, null); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); return; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); + writeChunkedException(clientMessage, e, serverConnection); return; } @@ -119,7 +119,7 @@ public class ExecuteCQ extends BaseCQCommand { boolean sendResults = false; boolean successQuery = false; - if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { + if (clientMessage.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { sendResults = true; } @@ -130,8 +130,8 @@ public class ExecuteCQ extends BaseCQCommand { cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null); } ((DefaultQuery) query).setIsCqQuery(true); - successQuery = processQuery(msg, query, cqQueryString, cqRegionNames, start, cqQuery, - executeCQContext, servConn, sendResults); + successQuery = processQuery(clientMessage, query, cqQueryString, cqRegionNames, start, cqQuery, + executeCQContext, serverConnection, sendResults); // Update the CQ statistics. cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) - oldstart); @@ -153,12 +153,12 @@ public class ExecuteCQ extends BaseCQCommand { // Send OK to client sendCqResponse(MessageType.REPLY, LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), - msg.getTransactionId(), null, servConn); + clientMessage.getTransactionId(), null, serverConnection); long start2 = DistributionStats.getStatTime(); stats.incProcessCreateCqTime(start2 - oldstart); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java index de61445..805ee48 100755 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteCQ61.java @@ -60,30 +60,30 @@ public class ExecuteCQ61 extends BaseCQCommand { private ExecuteCQ61() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { - AcceptorImpl acceptor = servConn.getAcceptor(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - ClientProxyMembershipID id = servConn.getProxyID(); - CacheServerStats stats = servConn.getCacheServerStats(); + AcceptorImpl acceptor = serverConnection.getAcceptor(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + ClientProxyMembershipID id = serverConnection.getProxyID(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); // Retrieve the data from the message parts - String cqName = msg.getPart(0).getString(); - String cqQueryString = msg.getPart(1).getString(); - int cqState = msg.getPart(2).getInt(); + String cqName = clientMessage.getPart(0).getString(); + String cqQueryString = clientMessage.getPart(1).getString(); + int cqState = clientMessage.getPart(2).getInt(); - Part isDurablePart = msg.getPart(3); + Part isDurablePart = clientMessage.getPart(3); byte[] isDurableByte = isDurablePart.getSerializedForm(); boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false : true; // region data policy - Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts() - 1); + Part regionDataPolicyPart = clientMessage.getPart(clientMessage.getNumberOfParts() - 1); byte[] regionDataPolicyPartBytes = regionDataPolicyPart.getSerializedForm(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(), - MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName, + logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", serverConnection.getName(), + MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString(), cqName, cqQueryString); } @@ -96,8 +96,7 @@ public class ExecuteCQ61 extends BaseCQCommand { String err = LocalizedStrings.ExecuteCQ_SERVER_NOTIFYBYSUBSCRIPTION_MODE_IS_SET_TO_FALSE_CQ_EXECUTION_IS_NOT_SUPPORTED_IN_THIS_MODE .toLocalizedString(); - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, - servConn); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } } @@ -113,7 +112,7 @@ public class ExecuteCQ61 extends BaseCQCommand { qService = (DefaultQueryService) crHelper.getCache().getLocalQueryService(); // Authorization check - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { query = qService.newQuery(cqQueryString); cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null); @@ -141,16 +140,16 @@ public class ExecuteCQ61 extends BaseCQCommand { // registering cq with serverConnection so that when CCP will require auth info it can access // that // registering cq auth before as possibility that you may get event - servConn.setCq(cqName, isDurable); + serverConnection.setCq(cqName, isDurable); cqQuery = (ServerCQImpl) cqServiceForExec.executeCq(cqName, cqQueryString, cqState, id, ccn, isDurable, true, regionDataPolicyPartBytes[0], null); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); - servConn.removeCq(cqName, isDurable); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); + serverConnection.removeCq(cqName, isDurable); return; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); - servConn.removeCq(cqName, isDurable); + writeChunkedException(clientMessage, e, serverConnection); + serverConnection.removeCq(cqName, isDurable); return; } @@ -158,7 +157,7 @@ public class ExecuteCQ61 extends BaseCQCommand { boolean sendResults = false; boolean successQuery = false; - if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { + if (clientMessage.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { sendResults = true; } @@ -173,8 +172,8 @@ public class ExecuteCQ61 extends BaseCQCommand { cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null); } ((DefaultQuery) query).setIsCqQuery(true); - successQuery = processQuery(msg, query, cqQueryString, cqRegionNames, start, cqQuery, - executeCQContext, servConn, sendResults); + successQuery = processQuery(clientMessage, query, cqQueryString, cqRegionNames, start, cqQuery, + executeCQContext, serverConnection, sendResults); // Update the CQ statistics. @@ -203,12 +202,12 @@ public class ExecuteCQ61 extends BaseCQCommand { // Send OK to client sendCqResponse(MessageType.REPLY, LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), - msg.getTransactionId(), null, servConn); + clientMessage.getTransactionId(), null, serverConnection); long start2 = DistributionStats.getStatTime(); stats.incProcessCreateCqTime(start2 - oldstart); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java index 69be347..b1faeee 100644 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetCQStats.java @@ -36,32 +36,32 @@ public class GetCQStats extends BaseCQCommand { private GetCQStats() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { - logger.debug("{}: Received close all client CQs request from {}", servConn.getName(), - servConn.getSocketString()); + logger.debug("{}: Received close all client CQs request from {}", serverConnection.getName(), + serverConnection.getSocketString()); } // Retrieve the data from the message parts - String cqName = msg.getPart(0).getString(); + String cqName = clientMessage.getPart(0).getString(); if (isDebugEnabled) { - logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(), - servConn.getSocketString(), cqName); + logger.debug("{}: Received close CQ request from {} cqName: {}", serverConnection.getName(), + serverConnection.getSocketString(), cqName); } // Process the query request if (cqName == null) { String err = "The cqName for the cq stats request is null"; - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } @@ -74,13 +74,12 @@ public class GetCQStats extends BaseCQCommand { cqService.start(); } catch (Exception e) { String err = "Exception while Getting the CQ Statistics. "; - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection); return; } // Send OK to client - sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", msg.getTransactionId(), null, - servConn); - servConn.setAsTrue(RESPONDED); + sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", clientMessage.getTransactionId(), null, serverConnection); + serverConnection.setAsTrue(RESPONDED); { long oldStart = start; http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java index a2d201d..e39c8e1 100755 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetDurableCQs.java @@ -44,19 +44,19 @@ public class GetDurableCQs extends BaseCQCommand { private GetDurableCQs() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { - AcceptorImpl acceptor = servConn.getAcceptor(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - ClientProxyMembershipID id = servConn.getProxyID(); - CacheServerStats stats = servConn.getCacheServerStats(); + AcceptorImpl acceptor = serverConnection.getAcceptor(); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + ClientProxyMembershipID id = serverConnection.getProxyID(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); if (logger.isDebugEnabled()) { - logger.debug("{}: Received {} request from {}", servConn.getName(), - MessageType.getString(msg.getMessageType()), servConn.getSocketString()); + logger.debug("{}: Received {} request from {}", serverConnection.getName(), + MessageType.getString(clientMessage.getMessageType()), serverConnection.getSocketString()); } DefaultQueryService qService = null; @@ -68,7 +68,7 @@ public class GetDurableCQs extends BaseCQCommand { this.securityService.authorizeClusterRead(); // Authorization check - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { authzRequest.getDurableCQsAuthorize(); } @@ -76,34 +76,34 @@ public class GetDurableCQs extends BaseCQCommand { cqServiceForExec = qService.getCqService(); List<String> durableCqs = cqServiceForExec.getAllDurableClientCqs(id); - ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + ChunkedMessage chunkedResponseMsg = serverConnection.getChunkedResponseMessage(); chunkedResponseMsg.setMessageType(MessageType.RESPONSE); - chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.setTransactionId(clientMessage.getTransactionId()); chunkedResponseMsg.sendHeader(); - List durableCqList = new ArrayList(maximumChunkSize); + List durableCqList = new ArrayList(MAXIMUM_CHUNK_SIZE); final boolean isTraceEnabled = logger.isTraceEnabled(); for (Iterator it = durableCqs.iterator(); it.hasNext();) { Object durableCqName = it.next(); durableCqList.add(durableCqName); if (isTraceEnabled) { - logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", servConn.getName(), + logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", serverConnection.getName(), durableCqName, durableCqList.size()); } - if (durableCqList.size() == maximumChunkSize) { + if (durableCqList.size() == MAXIMUM_CHUNK_SIZE) { // Send the chunk and clear the list - sendDurableCqsResponseChunk(durableCqList, false, servConn); + sendDurableCqsResponseChunk(durableCqList, false, serverConnection); durableCqList.clear(); } } // Send the last chunk even if the list is of zero size. - sendDurableCqsResponseChunk(durableCqList, true, servConn); + sendDurableCqsResponseChunk(durableCqList, true, serverConnection); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); return; } catch (Exception e) { - writeChunkedException(msg, e, false, servConn); + writeChunkedException(clientMessage, e, serverConnection); return; } } @@ -114,7 +114,7 @@ public class GetDurableCQs extends BaseCQCommand { chunkedResponseMsg.setNumberOfParts(1); chunkedResponseMsg.setLastChunk(lastChunk); - chunkedResponseMsg.addObjPart(list, zipValues); + chunkedResponseMsg.addObjPart(list, false); if (logger.isDebugEnabled()) { logger.debug("{}: Sending {} durableCQs response chunk{}", servConn.getName(), http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java index a8fec9f..5393e81 100644 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/MonitorCQ.java @@ -36,39 +36,38 @@ public class MonitorCQ extends BaseCQCommand { private MonitorCQ() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); - int op = msg.getPart(0).getInt(); + int op = clientMessage.getPart(0).getInt(); if (op < 1) { // This should have been taken care at the client - remove? String err = LocalizedStrings.MonitorCQ__0_THE_MONITORCQ_OPERATION_IS_INVALID - .toLocalizedString(servConn.getName()); - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn); + .toLocalizedString(serverConnection.getName()); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } String regionName = null; - if (msg.getNumberOfParts() == 2) { + if (clientMessage.getNumberOfParts() == 2) { // This will be enable/disable on region. - regionName = msg.getPart(1).getString(); + regionName = clientMessage.getPart(1).getString(); if (regionName == null) { // This should have been taken care at the client - remove? String err = LocalizedStrings.MonitorCQ__0_A_NULL_REGION_NAME_WAS_PASSED_FOR_MONITORCQ_OPERATION - .toLocalizedString(servConn.getName()); - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, - servConn); + .toLocalizedString(serverConnection.getName()); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } } if (logger.isDebugEnabled()) { - logger.debug("{}: Received MonitorCq request from {} op: {}{}", servConn.getName(), - servConn.getSocketString(), op, (regionName != null) ? " RegionName: " + regionName : ""); + logger.debug("{}: Received MonitorCq request from {} op: {}{}", serverConnection.getName(), + serverConnection.getSocketString(), op, (regionName != null) ? " RegionName: " + regionName : ""); } this.securityService.authorizeClusterRead(); @@ -85,12 +84,12 @@ public class MonitorCQ extends BaseCQCommand { throw new CqException( LocalizedStrings.CqService_INVALID_CQ_MONITOR_REQUEST_RECEIVED.toLocalizedString()); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); return; } catch (Exception e) { String err = LocalizedStrings.MonitorCQ_EXCEPTION_WHILE_HANDLING_THE_MONITOR_REQUEST_OP_IS_0 .toLocalizedString(Integer.valueOf(op)); - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection); return; } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java index 94304d3..070cb04 100644 --- a/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java +++ b/geode-cq/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/StopCQ.java @@ -44,30 +44,30 @@ public class StopCQ extends BaseCQCommand { private StopCQ() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - ClientProxyMembershipID id = servConn.getProxyID(); - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + ClientProxyMembershipID id = serverConnection.getProxyID(); + CacheServerStats stats = serverConnection.getCacheServerStats(); // Based on MessageType.QUERY // Added by Rao 2/1/2007 - servConn.setAsTrue(REQUIRES_RESPONSE); - servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - String cqName = msg.getPart(0).getString(); + String cqName = clientMessage.getPart(0).getString(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received stop CQ request from {} cqName: {}", servConn.getName(), - servConn.getSocketString(), cqName); + logger.debug("{}: Received stop CQ request from {} cqName: {}", serverConnection.getName(), + serverConnection.getSocketString(), cqName); } // Process the query request if (cqName == null) { String err = LocalizedStrings.StopCQ_THE_CQNAME_FOR_THE_CQ_STOP_REQUEST_IS_NULL.toLocalizedString(); - sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg.getTransactionId(), null, servConn); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, clientMessage.getTransactionId(), null, serverConnection); return; } @@ -86,7 +86,7 @@ public class StopCQ extends BaseCQCommand { this.securityService.authorizeDataManage(); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { String queryStr = null; Set cqRegionNames = null; @@ -100,23 +100,23 @@ public class StopCQ extends BaseCQCommand { } cqService.stopCq(cqName, id); if (cqQuery != null) - servConn.removeCq(cqName, cqQuery.isDurable()); + serverConnection.removeCq(cqName, cqQuery.isDurable()); } catch (CqException cqe) { - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), cqe, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", clientMessage.getTransactionId(), cqe, serverConnection); return; } catch (Exception e) { String err = LocalizedStrings.StopCQ_EXCEPTION_WHILE_STOPPING_CQ_NAMED_0.toLocalizedString(cqName); - sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg.getTransactionId(), e, servConn); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, clientMessage.getTransactionId(), e, serverConnection); return; } // Send OK to client sendCqResponse(MessageType.REPLY, - LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), msg.getTransactionId(), - null, servConn); + LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), clientMessage.getTransactionId(), + null, serverConnection); - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); { long oldStart = start;