http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index 4bd4970..5631184 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -17,9 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.distributed.ConfigurationProperties.*; import java.io.BufferedOutputStream; -import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Method; @@ -70,12 +68,7 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.MessageWithReply; -import org.apache.geode.distributed.internal.ReplyMessage; -import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.ClassLoadUtil; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.statistics.DummyStatisticsFactory; @@ -127,6 +120,22 @@ import org.apache.geode.security.AuthenticationRequiredException; public class CacheClientNotifier { private static final Logger logger = LogService.getLogger(); + /** + * The size of the server-to-client communication socket buffers. This can be modified using the + * BridgeServer.SOCKET_BUFFER_SIZE system property. + */ + private static final int socketBufferSize = + Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768); + + private static final long CLIENT_PING_TASK_PERIOD = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000); + + /** + * package-private to avoid synthetic accessor + */ + static final long CLIENT_PING_TASK_COUNTER = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3); + private static volatile CacheClientNotifier ccnSingleton; /** @@ -149,20 +158,6 @@ public class CacheClientNotifier { private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>(); - /** - * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use - * a direct reference to cache in CacheClientNotifier code. Instead, you should always use - * {@code getCache()} - */ - private InternalCache cache; // TODO: fix synchronization of cache - - private InternalLogWriter logWriter; - - /** - * The GemFire security {@code LogWriter} - */ - private InternalLogWriter securityLogWriter; - /** the maximum number of messages that can be enqueued in a client-queue. */ private final int maximumMessageCount; @@ -179,24 +174,9 @@ public class CacheClientNotifier { private final CacheServerStats acceptorStats; /** - * haContainer can hold either the name of the client-messages-region (in case of eviction - * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In - * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value. - */ - private volatile HAContainerWrapper haContainer; - - /** - * The size of the server-to-client communication socket buffers. This can be modified using the - * BridgeServer.SOCKET_BUFFER_SIZE system property. - */ - private static final int socketBufferSize = - Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768); - - /** * The statistics for this notifier */ - final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy then - // make private + final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy /** * The {@code InterestRegistrationListener} instances registered in this VM. This is used when @@ -209,55 +189,41 @@ public class CacheClientNotifier { * provide a read-only {@code Set} of listeners. */ private final Set readableInterestRegistrationListeners = - Collections.unmodifiableSet(this.writableInterestRegistrationListeners); - - /** - * System property name for indicating how much frequently the "Queue full" message should be - * logged. - */ - private static final String MAX_QUEUE_LOG_FREQUENCY = - DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit"; + Collections.unmodifiableSet(this.writableInterestRegistrationListeners); - public static final long DEFAULT_LOG_FREQUENCY = 1000; + private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>(); - private static final String EVENT_ENQUEUE_WAIT_TIME_NAME = - DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME"; + private final Object lockIsCompiledQueryCleanupThreadStarted = new Object(); - private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; + private final SocketCloser socketCloser; - /** - * System property value denoting the time in milliseconds. Any thread putting an event into a - * subscription queue, which is full, will wait this much time for the queue to make space. It'll - * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See - * #51400. - */ - public static int eventEnqueueWaitTime; // TODO: encapsulate eventEnqueueWaitTime + /** package-private to avoid synthetic accessor */ + final Set blackListedClients = new CopyOnWriteArraySet(); /** - * The frequency of logging the "Queue full" message. + * haContainer can hold either the name of the client-messages-region (in case of eviction + * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In + * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value. */ - private long logFrequency = DEFAULT_LOG_FREQUENCY; - - private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>(); + private volatile HAContainerWrapper haContainer; private volatile boolean isCompiledQueryCleanupThreadStarted = false; - private final Object lockIsCompiledQueryCleanupThreadStarted = new Object(); - - private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask - - private final SocketCloser socketCloser; + /** + * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use + * a direct reference to cache in CacheClientNotifier code. Instead, you should always use + * {@code getCache()} + */ + private InternalCache cache; // TODO: fix synchronization of cache - private static final long CLIENT_PING_TASK_PERIOD = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000); + private InternalLogWriter logWriter; /** - * package-private to avoid synthetic accessor + * The GemFire security {@code LogWriter} */ - static final long CLIENT_PING_TASK_COUNTER = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3); + private InternalLogWriter securityLogWriter; - private final Set blackListedClients = new CopyOnWriteArraySet(); + private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask /** * Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance. @@ -319,21 +285,6 @@ public class CacheClientNotifier { } this.statistics = new CacheClientNotifierStats(factory); - try { - this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); - if (this.logFrequency <= 0) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - } catch (Exception e) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - - eventEnqueueWaitTime = - Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); - if (eventEnqueueWaitTime < 0) { - eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; - } - // Schedule task to periodically ping clients. scheduleClientPingTask(); } @@ -923,7 +874,7 @@ public class CacheClientNotifier { * in it that determines which clients will receive the event. */ public static void notifyClients(InternalCacheEvent event) { - CacheClientNotifier instance = ccnSingleton; + CacheClientNotifier instance = getInstance(); if (instance != null) { instance.singletonNotifyClients(event, null); } @@ -935,7 +886,7 @@ public class CacheClientNotifier { */ public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage clientUpdateMessage) { - CacheClientNotifier instance = ccnSingleton; + CacheClientNotifier instance = getInstance(); if (instance != null) { instance.singletonNotifyClients(event, clientUpdateMessage); } @@ -1094,7 +1045,7 @@ public class CacheClientNotifier { * interest established, or override the isClientInterested method to implement its own routing */ public static void routeClientMessage(Conflatable clientMessage) { - CacheClientNotifier instance = ccnSingleton; + CacheClientNotifier instance = getInstance(); if (instance != null) { // ok to use keySet here because all we do is call getClientProxy with these keys instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet()); @@ -1106,7 +1057,7 @@ public class CacheClientNotifier { */ static void routeSingleClientMessage(ClientUpdateMessage clientMessage, ClientProxyMembershipID clientProxyMembershipId) { - CacheClientNotifier instance = ccnSingleton; + CacheClientNotifier instance = getInstance(); if (instance != null) { instance.singletonRouteClientMessage(clientMessage, Collections.singleton(clientProxyMembershipId)); @@ -1589,7 +1540,7 @@ public class CacheClientNotifier { } } - if (noActiveServer() && ccnSingleton != null) { + if (noActiveServer() && getInstance() != null) { ccnSingleton = null; if (this.haContainer != null) { this.haContainer.cleanUp(); @@ -1814,7 +1765,7 @@ public class CacheClientNotifier { /** * Shuts down durable client proxy */ - public boolean closeDurableClientProxy(String durableClientId) throws CacheException { + public boolean closeDurableClientProxy(String durableClientId) { CacheClientProxy ccp = getClientProxy(durableClientId); if (ccp == null) { return false; @@ -1828,8 +1779,7 @@ public class CacheClientNotifier { if (logger.isDebugEnabled()) { logger.debug("Cannot close running durable client: {}", durableClientId); } - // TODO: never throw an anonymous inner class - throw new CacheException("Cannot close a running durable client : " + durableClientId) {}; + throw new IllegalStateException("Cannot close a running durable client : " + durableClientId); } } @@ -2114,10 +2064,6 @@ public class CacheClientNotifier { CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD); } - public long getLogFrequency() { - return this.logFrequency; - } - /** * @return the haContainer */ @@ -2182,93 +2128,4 @@ public class CacheClientNotifier { } } - /** - * Static inner-class ServerInterestRegistrationMessage - * <p> - * this message is used to send interest registration to another server. Since interest - * registration performs a state-flush operation this message must not transmitted on an ordered - * socket - */ - public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage - implements MessageWithReply { - - ClientProxyMembershipID clientId; - ClientInterestMessageImpl clientMessage; - int processorId; - - ServerInterestRegistrationMessage(ClientProxyMembershipID clientID, - ClientInterestMessageImpl msg) { - this.clientId = clientID; - this.clientMessage = msg; - } - - public ServerInterestRegistrationMessage() { - // nothing - } - - static void sendInterestChange(DM dm, ClientProxyMembershipID clientID, - ClientInterestMessageImpl msg) { - ServerInterestRegistrationMessage registrationMessage = - new ServerInterestRegistrationMessage(clientID, msg); - Set recipients = dm.getOtherDistributionManagerIds(); - registrationMessage.setRecipients(recipients); - ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients); - registrationMessage.processorId = rp.getProcessorId(); - dm.putOutgoing(registrationMessage); - try { - rp.waitForReplies(); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - } - - @Override - protected void process(DistributionManager dm) { - // Get the proxy for the proxy id - try { - CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); - if (clientNotifier != null) { - CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId); - // If this VM contains a proxy for the requested proxy id, forward the - // message on to the proxy for processing - if (proxy != null) { - proxy.processInterestMessage(this.clientMessage); - } - } - } finally { - ReplyMessage reply = new ReplyMessage(); - reply.setProcessorId(this.processorId); - reply.setRecipient(getSender()); - try { - dm.putOutgoing(reply); - } catch (CancelException ignore) { - // can't send a reply, so ignore the exception - } - } - } - - @Override - public int getDSFID() { - return SERVER_INTEREST_REGISTRATION_MESSAGE; - } - - @Override - public void toData(DataOutput out) throws IOException { - super.toData(out); - out.writeInt(this.processorId); - InternalDataSerializer.invokeToData(this.clientId, out); - InternalDataSerializer.invokeToData(this.clientMessage, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - super.fromData(in); - this.processorId = in.readInt(); - this.clientId = new ClientProxyMembershipID(); - InternalDataSerializer.invokeFromData(this.clientId, in); - this.clientMessage = new ClientInterestMessageImpl(); - InternalDataSerializer.invokeFromData(this.clientMessage, in); - } - } } -
http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java new file mode 100644 index 0000000..5860982 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.geode.CancelException; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; +import org.apache.geode.distributed.internal.MessageWithReply; +import org.apache.geode.distributed.internal.ReplyMessage; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.internal.InternalDataSerializer; + +/** + * Send interest registration to another server. Since interest registration performs a state-flush operation this message must not transmitted on an ordered socket. + * <p> + * Extracted from CacheClientNotifier + */ +public class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage + implements MessageWithReply { + + private ClientProxyMembershipID clientId; + private ClientInterestMessageImpl clientMessage; + private int processorId; + + ServerInterestRegistrationMessage(ClientProxyMembershipID clientId, ClientInterestMessageImpl clientInterestMessage) { + this.clientId = clientId; + this.clientMessage = clientInterestMessage; + } + + public ServerInterestRegistrationMessage() { + // deserializing in fromData + } + + static void sendInterestChange(DM dm, ClientProxyMembershipID clientId, ClientInterestMessageImpl clientInterestMessage) { + ServerInterestRegistrationMessage registrationMessage = + new ServerInterestRegistrationMessage(clientId, clientInterestMessage); + + Set recipients = dm.getOtherDistributionManagerIds(); + registrationMessage.setRecipients(recipients); + + ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, recipients); + registrationMessage.processorId = replyProcessor.getProcessorId(); + + dm.putOutgoing(registrationMessage); + + try { + replyProcessor.waitForReplies(); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + @Override + protected void process(DistributionManager dm) { + // Get the proxy for the proxy id + try { + CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); + if (clientNotifier != null) { + CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId); + // If this VM contains a proxy for the requested proxy id, forward the + // message on to the proxy for processing + if (proxy != null) { + proxy.processInterestMessage(this.clientMessage); + } + } + } finally { + ReplyMessage reply = new ReplyMessage(); + reply.setProcessorId(this.processorId); + reply.setRecipient(getSender()); + try { + dm.putOutgoing(reply); + } catch (CancelException ignore) { + // can't send a reply, so ignore the exception + } + } + } + + @Override + public int getDSFID() { + return SERVER_INTEREST_REGISTRATION_MESSAGE; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + out.writeInt(this.processorId); + InternalDataSerializer.invokeToData(this.clientId, out); + InternalDataSerializer.invokeToData(this.clientMessage, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.processorId = in.readInt(); + this.clientId = new ClientProxyMembershipID(); + InternalDataSerializer.invokeFromData(this.clientId, in); + this.clientMessage = new ClientInterestMessageImpl(); + InternalDataSerializer.invokeFromData(this.clientMessage, in); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java index 1b599e9..fb0bd50 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java @@ -39,29 +39,29 @@ public class AddPdxEnum extends BaseCommand { private AddPdxEnum() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); if (logger.isDebugEnabled()) { logger.debug("{}: Received get pdx id for enum request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); - EnumInfo enumInfo = (EnumInfo) msg.getPart(0).getObject(); - int enumId = msg.getPart(1).getInt(); + EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject(); + int enumId = clientMessage.getPart(1).getInt(); try { - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteEnum(enumId, enumInfo); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java index 9b8302e..10a065c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java @@ -39,33 +39,33 @@ public class AddPdxType extends BaseCommand { private AddPdxType() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); if (logger.isDebugEnabled()) { logger.debug("{}: Received get pdx id for type request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); - PdxType type = (PdxType) msg.getPart(0).getObject(); - int typeId = msg.getPart(1).getInt(); + PdxType type = (PdxType) clientMessage.getPart(0).getObject(); + int typeId = clientMessage.getPart(1).getInt(); // The native client needs this line // because it doesn't set the type id on the // client side. type.setTypeId(typeId); try { - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteType(typeId, type); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java index 959430c..c9c5a9d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java @@ -47,15 +47,15 @@ public class ClearRegion extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, callbackArgPart = null; String regionName = null; Object callbackArg = null; Part eventPart = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; @@ -63,36 +63,36 @@ public class ClearRegion extends BaseCommand { stats.incReadClearRegionRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - eventPart = msg.getPart(1); + regionNamePart = clientMessage.getPart(0); + eventPart = clientMessage.getPart(1); // callbackArgPart = null; (redundant assignment) - if (msg.getNumberOfParts() > 2) { - callbackArgPart = msg.getPart(2); + if (clientMessage.getNumberOfParts() > 2) { + callbackArgPart = clientMessage.getPart(2); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } regionName = regionNamePart.getString(); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Received clear region request (" + msg.getPayloadLength() - + " bytes) from " + servConn.getSocketString() + " for region " + regionName); + logger.debug(serverConnection.getName() + ": Received clear region request (" + clientMessage.getPayloadLength() + + " bytes) from " + serverConnection.getSocketString() + " for region " + regionName); } // Process the clear region request if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ClearRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); String errMessage = LocalizedStrings.ClearRegion_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL .toLocalizedString(); - writeErrorResponse(msg, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -100,35 +100,35 @@ public class ClearRegion extends BaseCommand { if (region == null) { String reason = LocalizedStrings.ClearRegion_WAS_NOT_FOUND_DURING_CLEAR_REGION_REGUEST .toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); try { // Clear the region this.securityService.authorizeRegionWrite(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { RegionClearOperationContext clearContext = authzRequest.clearAuthorize(regionName, callbackArg); callbackArg = clearContext.getCallbackArg(); } - region.basicBridgeClear(callbackArg, servConn.getProxyID(), + region.basicBridgeClear(callbackArg, serverConnection.getProxyID(), true /* boolean from cache Client */, eventId); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the clear, preserve the connection - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -138,10 +138,10 @@ public class ClearRegion extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessClearRegionTime(start - oldStart); } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Sent clear region response for region " + regionName); + logger.debug(serverConnection.getName() + ": Sent clear region response for region " + regionName); } stats.incWriteClearRegionResponseTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java index d50e522..053ef8a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java @@ -35,34 +35,34 @@ public class ClientReady extends BaseCommand { private ClientReady() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadClientReadyRequestTime(start - oldStart); } try { - String clientHost = servConn.getSocketHost(); - int clientPort = servConn.getSocketPort(); + String clientHost = serverConnection.getSocketHost(); + int clientPort = serverConnection.getSocketPort(); if (logger.isDebugEnabled()) { logger.debug("{}: Received client ready request ({} bytes) from {} on {}:{}", - servConn.getName(), msg.getPayloadLength(), servConn.getProxyID(), clientHost, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getProxyID(), clientHost, clientPort); } - servConn.getAcceptor().getCacheClientNotifier().readyForEvents(servConn.getProxyID()); + serverConnection.getAcceptor().getCacheClientNotifier().readyForEvents(serverConnection.getProxyID()); long oldStart = start; start = DistributionStats.getStatTime(); stats.incProcessClientReadyTime(start - oldStart); - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Processed client ready request from " - + servConn.getProxyID() + " on " + clientHost + ":" + clientPort); + logger.debug(serverConnection.getName() + ": Processed client ready request from " + + serverConnection.getProxyID() + " on " + clientHost + ":" + clientPort); } } finally { stats.incWriteClientReadyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java index 66045aa..378a322 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java @@ -39,43 +39,43 @@ public class CloseConnection extends BaseCommand { private CloseConnection() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); long oldStart = start; - boolean respondToClient = servConn.getClientVersion().compareTo(Version.GFE_90) >= 0; + boolean respondToClient = serverConnection.getClientVersion().compareTo(Version.GFE_90) >= 0; start = DistributionStats.getStatTime(); stats.incReadCloseConnectionRequestTime(start - oldStart); if (respondToClient) { // newer clients will wait for a response or EOFException - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); } try { - servConn.setClientDisconnectCleanly(); - String clientHost = servConn.getSocketHost(); - int clientPort = servConn.getSocketPort(); + serverConnection.setClientDisconnectCleanly(); + String clientHost = serverConnection.getSocketHost(); + int clientPort = serverConnection.getSocketPort(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received close request ({} bytes) from {}:{}", servConn.getName(), - msg.getPayloadLength(), clientHost, clientPort); + logger.debug("{}: Received close request ({} bytes) from {}:{}", serverConnection.getName(), + clientMessage.getPayloadLength(), clientHost, clientPort); } - Part keepalivePart = msg.getPart(0); + Part keepalivePart = clientMessage.getPart(0); byte[] keepaliveByte = keepalivePart.getSerializedForm(); boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true; - servConn.getAcceptor().getCacheClientNotifier().setKeepAlive(servConn.getProxyID(), + serverConnection.getAcceptor().getCacheClientNotifier().setKeepAlive(serverConnection.getProxyID(), keepalive); if (logger.isDebugEnabled()) { - logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", servConn.getName(), + logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", serverConnection.getName(), clientHost, clientPort, keepalive); } } finally { if (respondToClient) { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } - servConn.setFlagProcessMessagesAsFalse(); + serverConnection.setFlagProcessMessagesAsFalse(); stats.incProcessCloseConnectionTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java index 55ef09b..b2bba4f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java @@ -50,12 +50,12 @@ public class CommitCommand extends BaseCommand { private CommitCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - servConn.setAsTrue(REQUIRES_RESPONSE); - TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { + serverConnection.setAsTrue(REQUIRES_RESPONSE); + TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); TXId txId = new TXId(client, uniqId); TXCommitMessage commitMsg = null; if (txMgr.isHostedTxRecentlyCompleted(txId)) { @@ -64,11 +64,11 @@ public class CommitCommand extends BaseCommand { logger.debug("TX: returning a recently committed txMessage for tx: {}", txId); } if (!txMgr.isExceptionToken(commitMsg)) { - writeCommitResponse(commitMsg, msg, servConn); + writeCommitResponse(commitMsg, clientMessage, serverConnection); commitMsg.setClientVersion(null); // fixes bug 46529 - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } else { - sendException(msg, servConn, txMgr.getExceptionForToken(commitMsg, txId)); + sendException(clientMessage, serverConnection, txMgr.getExceptionForToken(commitMsg, txId)); } txMgr.removeHostedTXState(txId); return; @@ -87,10 +87,10 @@ public class CommitCommand extends BaseCommand { txMgr.commit(); commitMsg = txProxy.getCommitMessage(); - writeCommitResponse(commitMsg, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeCommitResponse(commitMsg, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - sendException(msg, servConn, e); + sendException(clientMessage, serverConnection, e); } finally { if (txId != null) { txMgr.removeHostedTXState(txId); @@ -115,7 +115,7 @@ public class CommitCommand extends BaseCommand { if (response != null) { response.setClientVersion(servConn.getClientVersion()); } - responseMsg.addObjPart(response, zipValues); + responseMsg.addObjPart(response, false); servConn.getCache().getCancelCriterion().checkCancelInProgress(null); if (logger.isDebugEnabled()) { logger.debug("TX: sending a nonNull response for transaction: {}", http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java index c1b67e1..50d1197 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java @@ -51,34 +51,34 @@ public class ContainsKey extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { Part regionNamePart = null; Part keyPart = null; String regionName = null; Object key = null; - CacheServerStats stats = servConn.getCacheServerStats(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadContainsKeyRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // Process the containsKey request @@ -87,47 +87,47 @@ public class ContainsKey extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } - writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName, key.toString()); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { try { authzRequest.containsKeyAuthorize(regionName, key); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -140,10 +140,10 @@ public class ContainsKey extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessContainsKeyTime(start - oldStart); } - writeContainsKeyResponse(containsKey, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeContainsKeyResponse(containsKey, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java index dc8f9eb..53bb414 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java @@ -55,34 +55,34 @@ public class ContainsKey66 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; ContainsKeyOp.MODE mode; - CacheServerStats stats = servConn.getCacheServerStats(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadContainsKeyRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); - mode = ContainsKeyOp.MODE.values()[(msg.getPart(2).getInt())]; + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); + mode = ContainsKeyOp.MODE.values()[(clientMessage.getPart(2).getInt())]; regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // Process the containsKey request @@ -91,46 +91,46 @@ public class ContainsKey66 extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } - writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName, key.toString()); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { try { authzRequest.containsKeyAuthorize(regionName, key); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -157,10 +157,10 @@ public class ContainsKey66 extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessContainsKeyTime(start - oldStart); } - writeContainsKeyResponse(containsKey, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeContainsKeyResponse(containsKey, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java index d84dc62..b7ab01b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java @@ -41,25 +41,25 @@ public class CreateRegion extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { Part regionNamePart = null; String regionName = null; - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); // bserverStats.incLong(readDestroyRequestTimeId, // DistributionStats.getStatTime() - start); // bserverStats.incInt(destroyRequestsId, 1); // start = DistributionStats.getStatTime(); // Retrieve the data from the message parts - Part parentRegionNamePart = msg.getPart(0); + Part parentRegionNamePart = clientMessage.getPart(0); String parentRegionName = parentRegionNamePart.getString(); - regionNamePart = msg.getPart(1); + regionNamePart = clientMessage.getPart(1); regionName = regionNamePart.getString(); if (logger.isDebugEnabled()) { logger.debug( "{}: Received create region request ({} bytes) from {} for parent region {} region {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), parentRegionName, + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), parentRegionName, regionName); } @@ -69,7 +69,7 @@ public class CreateRegion extends BaseCommand { if (parentRegionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.CreateRegion_0_THE_INPUT_PARENT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.CreateRegion_THE_INPUT_PARENT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL .toLocalizedString(); @@ -77,41 +77,41 @@ public class CreateRegion extends BaseCommand { if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.CreateRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.CreateRegion_THE_INPUT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL .toLocalizedString(); } - writeErrorResponse(msg, MessageType.CREATE_REGION_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CREATE_REGION_DATA_ERROR, errMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - Region parentRegion = servConn.getCache().getRegion(parentRegionName); + Region parentRegion = serverConnection.getCache().getRegion(parentRegionName); if (parentRegion == null) { String reason = LocalizedStrings.CreateRegion__0_WAS_NOT_FOUND_DURING_SUBREGION_CREATION_REQUEST .toLocalizedString(parentRegionName); - writeRegionDestroyedEx(msg, parentRegionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, parentRegionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeDataManage(); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { try { authzRequest.createRegionAuthorize(parentRegionName + '/' + regionName); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -121,11 +121,11 @@ public class CreateRegion extends BaseCommand { AttributesFactory factory = new AttributesFactory(parentRegion.getAttributes()); region = parentRegion.createSubregion(regionName, factory.create()); if (logger.isDebugEnabled()) { - logger.debug("{}: Created region {}", servConn.getName(), region); + logger.debug("{}: Created region {}", serverConnection.getName(), region); } } else { if (logger.isDebugEnabled()) { - logger.debug("{}: Retrieved region {}", servConn.getName(), region); + logger.debug("{}: Retrieved region {}", serverConnection.getName(), region); } } @@ -134,11 +134,11 @@ public class CreateRegion extends BaseCommand { // NOT USING IT // bserverStats.incLong(processDestroyTimeId, // DistributionStats.getStatTime() - start); - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { logger.debug("{}: Sent create region response for parent region {} region {}", - servConn.getName(), parentRegionName, regionName); + serverConnection.getName(), parentRegionName, regionName); } } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java index 1497044..359e1b4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java @@ -37,14 +37,15 @@ public class Default extends BaseCommand { private Default() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException { // requiresResponse = true; NOT NEEDED... ALWAYS SEND ERROR RESPONSE logger.fatal( LocalizedMessage.create(LocalizedStrings.Default_0_UNKNOWN_MESSAGE_TYPE_1_WITH_TX_2_FROM_3, - new Object[] {servConn.getName(), MessageType.getString(msg.getMessageType()), - Integer.valueOf(msg.getTransactionId()), servConn.getSocketString()})); - writeErrorResponse(msg, MessageType.UNKNOWN_MESSAGE_TYPE_ERROR, servConn); + new Object[] { + serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()), + Integer.valueOf(clientMessage.getTransactionId()), serverConnection.getSocketString()})); + writeErrorResponse(clientMessage, MessageType.UNKNOWN_MESSAGE_TYPE_ERROR, serverConnection); // responded = true; NOT NEEDED... ALWAYS SEND ERROR RESPONSE } http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java index 5996984..0699c8b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java @@ -48,7 +48,7 @@ public class Destroy extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long startparam) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam) throws IOException, InterruptedException { long start = startparam; @@ -57,8 +57,8 @@ public class Destroy extends BaseCommand { Object callbackArg = null, key = null; Part eventPart = null; StringBuffer errMessage = new StringBuffer(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; @@ -66,17 +66,17 @@ public class Destroy extends BaseCommand { stats.incReadDestroyRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); - eventPart = msg.getPart(2); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); + eventPart = clientMessage.getPart(2); // callbackArgPart = null; (redundant assignment) - if (msg.getNumberOfParts() > 3) { - callbackArgPart = msg.getPart(3); + if (clientMessage.getNumberOfParts() > 3) { + callbackArgPart = clientMessage.getPart(3); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -84,13 +84,13 @@ public class Destroy extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received destroy request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key); } // Process the destroy request @@ -98,29 +98,29 @@ public class Destroy extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.Destroy_0_THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage.append(LocalizedStrings.Destroy__THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL .toLocalizedString()); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.Destroy_0_THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage .append(LocalizedStrings.Destroy__THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL .toLocalizedString()); } - writeErrorResponse(msg, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -128,13 +128,13 @@ public class Destroy extends BaseCommand { ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); try { // for integrated security this.securityService.authorizeRegionWrite(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegionDestroyOperationContext destroyContext = @@ -146,35 +146,35 @@ public class Destroy extends BaseCommand { callbackArg = destroyContext.getCallbackArg(); } } - region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true, + region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), true, new EventIDHolder(eventId)); - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); } catch (EntryNotFoundException e) { // Don't send an exception back to the client if this // exception happens. Just log it and continue. logger.info(LocalizedMessage.create( LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] { serverConnection.getName(), key})); } catch (RegionDestroyedException rde) { - writeException(msg, rde, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, rde, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the destroy, preserve the connection - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (e instanceof GemFireSecurityException) { // Fine logging for security exceptions since these are already // logged by the security logger if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected Security exception", servConn.getName(), e); + logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e); } } else { logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } return; } @@ -188,17 +188,17 @@ public class Destroy extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType()); + writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, pr.getNetworkHopType()); pr.clearNetworkHopData(); } else { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } } else { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName, + logger.debug("{}: Sent destroy response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java index 585f57d..0ee0fc4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java @@ -54,7 +54,7 @@ public class Destroy65 extends BaseCommand { } @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn, + protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, PartitionedRegion pr, byte nwHop) throws IOException { throw new UnsupportedOperationException(); } @@ -72,7 +72,7 @@ public class Destroy65 extends BaseCommand { replyMsg.addIntPart(entryNotFoundForRemove ? 1 : 0); replyMsg.send(servConn); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), + logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(), origMsg.getTransactionId()); } } @@ -84,7 +84,7 @@ public class Destroy65 extends BaseCommand { replyMsg.setMessageType(MessageType.REPLY); replyMsg.setNumberOfParts(2); replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(OK_BYTES); + replyMsg.addBytesPart(okBytes()); replyMsg.addIntPart(entryNotFound ? 1 : 0); replyMsg.send(servConn); if (logger.isTraceEnabled()) { @@ -94,7 +94,7 @@ public class Destroy65 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart; Part keyPart; @@ -108,20 +108,20 @@ public class Destroy65 extends BaseCommand { String regionName = null; Object callbackArg = null, key = null; StringBuffer errMessage = new StringBuffer(); - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); long now = DistributionStats.getStatTime(); stats.incReadDestroyRequestTime(now - start); // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); - expectedOldValuePart = msg.getPart(2); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); + expectedOldValuePart = clientMessage.getPart(2); try { - operation = msg.getPart(3).getObject(); + operation = clientMessage.getPart(3).getObject(); if (((operation instanceof Operation) && ((Operation) operation == Operation.REMOVE)) || ((operation instanceof Byte) && (Byte) operation == OpType.DESTROY)) @@ -130,20 +130,20 @@ public class Destroy65 extends BaseCommand { expectedOldValue = expectedOldValuePart.getObject(); } } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - eventPart = msg.getPart(4); + eventPart = clientMessage.getPart(4); - if (msg.getNumberOfParts() > 5) { - callbackArgPart = msg.getPart(5); + if (clientMessage.getNumberOfParts() > 5) { + callbackArgPart = clientMessage.getPart(5); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -151,16 +151,16 @@ public class Destroy65 extends BaseCommand { try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug( "{}: Received destroy65 request ({} bytes; op={}) from {} for region {} key {}{} txId {}", - servConn.getName(), msg.getPayloadLength(), operation, servConn.getSocketString(), + serverConnection.getName(), clientMessage.getPayloadLength(), operation, serverConnection.getSocketString(), regionName, key, (operation == Operation.REMOVE ? " value=" + expectedOldValue : ""), - msg.getTransactionId()); + clientMessage.getTransactionId()); } boolean entryNotFoundForRemove = false; @@ -169,29 +169,29 @@ public class Destroy65 extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.Destroy_0_THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage.append(LocalizedStrings.Destroy__THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL .toLocalizedString()); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.Destroy_0_THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage .append(LocalizedStrings.Destroy__THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL .toLocalizedString()); } - writeErrorResponse(msg, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST .toLocalizedString(regionName); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -199,13 +199,13 @@ public class Destroy65 extends BaseCommand { ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); EventIDHolder clientEvent = new EventIDHolder(eventId); Breadcrumbs.setEventId(eventId); // msg.isRetry might be set by v7.0 and later clients - if (msg.isRetry()) { + if (clientMessage.isRetry()) { // if (logger.isDebugEnabled()) { // logger.debug("DEBUG: encountered isRetry in Destroy65"); // } @@ -223,7 +223,7 @@ public class Destroy65 extends BaseCommand { // for integrated security this.securityService.authorizeRegionWrite(regionName, key.toString()); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) { RegionDestroyOperationContext destroyContext = @@ -236,14 +236,14 @@ public class Destroy65 extends BaseCommand { } } if (operation == null || operation == Operation.DESTROY) { - region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true, clientEvent); + region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), true, clientEvent); } else { // this throws exceptions if expectedOldValue checks fail try { if (expectedOldValue == null) { expectedOldValue = Token.INVALID; } - if (operation == Operation.REMOVE && msg.isRetry() + if (operation == Operation.REMOVE && clientMessage.isRetry() && clientEvent.getVersionTag() != null) { // the operation was successful last time it was tried, so there's // no need to perform it again. Just return the version tag and @@ -254,55 +254,55 @@ public class Destroy65 extends BaseCommand { } // try the operation anyway to ensure that it's been distributed to all servers try { - region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), + region.basicBridgeRemove(key, expectedOldValue, callbackArg, serverConnection.getProxyID(), true, clientEvent); } catch (EntryNotFoundException e) { // ignore, and don't set entryNotFoundForRemove because this was a successful // operation - bug #51664 } } else { - region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(), + region.basicBridgeRemove(key, expectedOldValue, callbackArg, serverConnection.getProxyID(), true, clientEvent); if (logger.isDebugEnabled()) { logger.debug("region.remove succeeded"); } } } catch (EntryNotFoundException e) { - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); if (logger.isDebugEnabled()) { logger.debug("writing entryNotFound response"); } entryNotFoundForRemove = true; } } - servConn.setModificationInfo(true, regionName, key); + serverConnection.setModificationInfo(true, regionName, key); } catch (EntryNotFoundException e) { // Don't send an exception back to the client if this // exception happens. Just log it and continue. logger.info(LocalizedMessage.create( LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1, - new Object[] {servConn.getName(), key})); + new Object[] { serverConnection.getName(), key})); entryNotFoundForRemove = true; } catch (RegionDestroyedException rde) { - writeException(msg, rde, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, rde, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the destroy, preserve the connection - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (e instanceof GemFireSecurityException) { // Fine logging for security exceptions since these are already // logged by the security logger if (logger.isDebugEnabled()) { - logger.debug("{}: Unexpected Security exception", servConn.getName(), e); + logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e); } } else { logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION, - servConn.getName()), e); + serverConnection.getName()), e); } return; } @@ -314,20 +314,20 @@ public class Destroy65 extends BaseCommand { if (region instanceof PartitionedRegion) { PartitionedRegion pr = (PartitionedRegion) region; if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { - writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, + writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, entryNotFoundForRemove, pr.getNetworkHopType(), clientEvent.getVersionTag()); pr.clearNetworkHopData(); } else { - writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), + writeReply(clientMessage, serverConnection, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag()); } } else { - writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), + writeReply(clientMessage, serverConnection, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag()); } - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName, + logger.debug("{}: Sent destroy response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/fec1be92/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java index 59a7233..7c07c72 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java @@ -67,7 +67,7 @@ public class Destroy70 extends Destroy65 { pr.getPrStats().incPRMetaDataSentCount(); replyMsg.send(servConn); if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(), + logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(), origMsg.getTransactionId()); } } @@ -104,7 +104,7 @@ public class Destroy70 extends Destroy65 { // logger.fine("response has no version tag"); // } } - replyMsg.addBytesPart(OK_BYTES); // make old single-hop code happy by puting byte[]{0} here + replyMsg.addBytesPart(okBytes()); // make old single-hop code happy by puting byte[]{0} here replyMsg.addIntPart(entryNotFound ? 1 : 0); replyMsg.send(servConn); if (logger.isTraceEnabled()) {