http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java index f102b2d..1f9ef91 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Map; @@ -47,7 +46,7 @@ import java.util.concurrent.TimeUnit; * and serialize it out to the wire. * * <PRE> - * msgType - int - 4 bytes type of message, types enumerated below + * messageType - int - 4 bytes type of message, types enumerated below * * msgLength - int - 4 bytes total length of variable length payload * @@ -55,10 +54,10 @@ import java.util.concurrent.TimeUnit; * contained in the payload. Message can * be a multi-part message * - * transId - int - 4 bytes filled in by the requestor, copied back into + * transId - int - 4 bytes filled in by the requester, copied back into * the response * - * flags - byte- 1 byte filled in by the requestor + * flags - byte- 1 byte filled in by the requester * len1 * part1 * . @@ -76,18 +75,17 @@ import java.util.concurrent.TimeUnit; * * See also <a href="package-summary.html#messages">package description</a>. * - * @see org.apache.geode.internal.cache.tier.MessageType - * + * @see MessageType */ public class Message { - public static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; - /** - * maximum size of an outgoing message. See GEODE-478 - */ - public static int MAX_MESSAGE_SIZE = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size", - DEFAULT_MAX_MESSAGE_SIZE).intValue(); + // Tentative workaround to avoid OOM stated in #46754. + public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>(); + + public static final String MAX_MESSAGE_SIZE_PROPERTY = + DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"; + + static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; private static final Logger logger = LogService.getLogger(); @@ -97,83 +95,89 @@ public class Message { private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>(); - private static final byte[] TRUE; - private static final byte[] FALSE; + // These two statics are fields shoved into the flags byte for transmission. + // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other + // is left in place + private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02; + private static final byte MESSAGE_IS_RETRY = (byte) 0x04; + + private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB; - static { - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); + private static final int DEFAULT_CHUNK_SIZE = 1024; + + private static final byte[] TRUE = defineTrue(); + private static final byte[] FALSE = defineFalse(); + + private static byte[] defineTrue() { + try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) { BlobHelper.serializeTo(Boolean.TRUE, hdos); - TRUE = hdos.toByteArray(); - } catch (Exception e) { + return hdos.toByteArray(); + } catch (IOException e) { throw new IllegalStateException(e); } + } - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); + private static byte[] defineFalse() { + try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) { BlobHelper.serializeTo(Boolean.FALSE, hdos); - FALSE = hdos.toByteArray(); - } catch (Exception e) { + return hdos.toByteArray(); + } catch (IOException e) { throw new IllegalStateException(e); } } - protected int msgType; - protected int payloadLength = 0; - protected int numberOfParts = 0; + /** + * maximum size of an outgoing message. See GEODE-478 + */ + private final int maxMessageSize; + + protected int messageType; + private int payloadLength = 0; + int numberOfParts = 0; protected int transactionId = TXManagerImpl.NOTX; - protected int currentPart = 0; - protected Part[] partsList = null; - protected ByteBuffer cachedCommBuffer; + int currentPart = 0; + private Part[] partsList = null; + private ByteBuffer cachedCommBuffer; protected Socket socket = null; - protected SocketChannel sockCh = null; - protected OutputStream os = null; - protected InputStream is = null; - protected boolean messageModified = true; + private SocketChannel socketChannel = null; + private OutputStream outputStream = null; + protected InputStream inputStream = null; + private boolean messageModified = true; + /** is this message a retry of a previously sent message? */ - protected boolean isRetry; + private boolean isRetry; + private byte flags = 0x00; - protected MessageStats msgStats = null; - protected ServerConnection sc = null; + MessageStats messageStats = null; + protected ServerConnection serverConnection = null; private int maxIncomingMessageLength = -1; private Semaphore dataLimiter = null; - // private int MAX_MSGS = -1; - private Semaphore msgLimiter = null; - private boolean hdrRead = false; - private int chunkSize = 1024;// Default Chunk Size. + private Semaphore messageLimiter = null; + private boolean readHeader = false; + private int chunkSize = DEFAULT_CHUNK_SIZE; - protected Part securePart = null; + Part securePart = null; private boolean isMetaRegion = false; - - // These two statics are fields shoved into the flags byte for transmission. - // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other - // is left in place - public static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02; - public static final byte MESSAGE_IS_RETRY = (byte) 0x04; - - public static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB; - - // Tentative workaround to avoid OOM stated in #46754. - public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>(); - - Version version; + private Version version; /** * Creates a new message with the given number of parts */ public Message(int numberOfParts, Version destVersion) { + this.maxMessageSize = Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, DEFAULT_MAX_MESSAGE_SIZE); this.version = destVersion; Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message"); - partsList = new Part[numberOfParts]; + this.partsList = new Part[numberOfParts]; this.numberOfParts = numberOfParts; - for (int i = 0; i < partsList.length; i++) { - partsList[i] = new Part(); + int partsListLength = this.partsList.length; + for (int i = 0; i < partsListLength; i++) { + this.partsList[i] = new Part(); } } public boolean isSecureMode() { - return securePart != null; + return this.securePart != null; } public byte[] getSecureBytes() throws IOException, ClassNotFoundException { @@ -186,7 +190,7 @@ public class Message { throw new IllegalArgumentException( LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString()); } - this.msgType = msgType; + this.messageType = msgType; } public void setVersion(Version clientVersion) { @@ -194,17 +198,15 @@ public class Message { } public void setMessageHasSecurePartFlag() { - this.flags = (byte) (this.flags | MESSAGE_HAS_SECURE_PART); + this.flags |= MESSAGE_HAS_SECURE_PART; } public void clearMessageHasSecurePartFlag() { - this.flags = (byte) (this.flags & MESSAGE_HAS_SECURE_PART); + this.flags &= MESSAGE_HAS_SECURE_PART; } /** * Sets and builds the {@link Part}s that are sent in the payload of the Message - * - * @param numberOfParts */ public void setNumberOfParts(int numberOfParts) { // hitesh: need to add security header here from server @@ -227,9 +229,7 @@ public class Message { } /** - * For boundary testing we may need to inject mock parts - * - * @param parts + * For boundary testing we may need to inject mock parts. For testing only. */ void setParts(Part[] parts) { this.partsList = parts; @@ -260,7 +260,7 @@ public class Message { /** * When building a Message this will return the number of the next Part to be added to the message */ - public int getNextPartNumber() { + int getNextPartNumber() { return this.currentPart; } @@ -268,33 +268,36 @@ public class Message { addStringPart(str, false); } - private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<String, byte[]>(); + private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<>(); public void addStringPart(String str, boolean enableCaching) { if (str == null) { - addRawPart((byte[]) null, false); - } else { - Part part = partsList[this.currentPart]; - if (enableCaching) { - byte[] bytes = CACHED_STRINGS.get(str); - if (bytes == null) { - HeapDataOutputStream hdos = new HeapDataOutputStream(str); + addRawPart(null, false); + return; + } + + Part part = this.partsList[this.currentPart]; + if (enableCaching) { + byte[] bytes = CACHED_STRINGS.get(str); + if (bytes == null) { + try (HeapDataOutputStream hdos = new HeapDataOutputStream(str)) { bytes = hdos.toByteArray(); CACHED_STRINGS.put(str, bytes); } - part.setPartState(bytes, false); - } else { - HeapDataOutputStream hdos = new HeapDataOutputStream(str); - this.messageModified = true; - part.setPartState(hdos, false); } - this.currentPart++; + part.setPartState(bytes, false); + + } else { + // do NOT close the HeapDataOutputStream + this.messageModified = true; + part.setPartState(new HeapDataOutputStream(str), false); } + this.currentPart++; } /* - * Adds a new part to this message that contains a <code>byte</code> array (as opposed to a - * serialized object). + * Adds a new part to this message that contains a {@code byte} array (as opposed to a serialized + * object). * * @see #addPart(byte[], boolean) */ @@ -312,13 +315,6 @@ public class Message { } } - public void addDeltaPart(HeapDataOutputStream hdos) { - this.messageModified = true; - Part part = partsList[this.currentPart]; - part.setPartState(hdos, false); - this.currentPart++; - } - public void addObjPart(Object o) { addObjPart(o, false); } @@ -345,6 +341,9 @@ public class Message { } } + /** + * Object o is always null + */ public void addPartInAnyForm(@Unretained Object o, boolean isObject) { if (o == null) { addRawPart((byte[]) o, false); @@ -353,7 +352,7 @@ public class Message { } else if (o instanceof StoredObject) { // It is possible it is an off-heap StoredObject that contains a simple non-object byte[]. this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState((StoredObject) o, isObject); this.currentPart++; } else { @@ -362,59 +361,58 @@ public class Message { } private void serializeAndAddPartNoCopying(Object o) { - HeapDataOutputStream hdos; - Version v = version; - if (version.equals(Version.CURRENT)) { + Version v = this.version; + if (this.version.equals(Version.CURRENT)) { v = null; } - // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources - // passed to it. - hdos = new HeapDataOutputStream(chunkSize, v, true); + + // Create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources + // passed to it. Do NOT close the HeapDataOutputStream! + HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true); try { BlobHelper.serializeTo(o, hdos); } catch (IOException ex) { throw new SerializationException("failed serializing object", ex); } this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState(hdos, true); this.currentPart++; - } private void serializeAndAddPart(Object o, boolean zipValues) { if (zipValues) { throw new UnsupportedOperationException("zipValues no longer supported"); + } - } else { - HeapDataOutputStream hdos; - Version v = version; - if (version.equals(Version.CURRENT)) { - v = null; - } - hdos = new HeapDataOutputStream(chunkSize, v); - try { - BlobHelper.serializeTo(o, hdos); - } catch (IOException ex) { - throw new SerializationException("failed serializing object", ex); - } - this.messageModified = true; - Part part = partsList[this.currentPart]; - part.setPartState(hdos, true); - this.currentPart++; + Version v = this.version; + if (this.version.equals(Version.CURRENT)) { + v = null; } + + // do NOT close the HeapDataOutputStream + HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v); + try { + BlobHelper.serializeTo(o, hdos); + } catch (IOException ex) { + throw new SerializationException("failed serializing object", ex); + } + this.messageModified = true; + Part part = this.partsList[this.currentPart]; + part.setPartState(hdos, true); + this.currentPart++; } public void addIntPart(int v) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setInt(v); this.currentPart++; } public void addLongPart(long v) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setLong(v); this.currentPart++; } @@ -424,13 +422,13 @@ public class Message { */ public void addRawPart(byte[] newPart, boolean isObject) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState(newPart, isObject); this.currentPart++; } public int getMessageType() { - return this.msgType; + return this.messageType; } public int getPayloadLength() { @@ -451,7 +449,7 @@ public class Message { public Part getPart(int index) { if (index < this.numberOfParts) { - Part p = partsList[index]; + Part p = this.partsList[index]; if (this.version != null) { p.setVersion(this.version); } @@ -480,9 +478,9 @@ public class Message { if (len != 0) { this.payloadLength = 0; } - if (this.hdrRead) { - if (this.msgStats != null) { - this.msgStats.decMessagesBeingReceived(len); + if (this.readHeader) { + if (this.messageStats != null) { + this.messageStats.decMessagesBeingReceived(len); } } ByteBuffer buffer = getCommBuffer(); @@ -495,20 +493,19 @@ public class Message { this.dataLimiter = null; this.maxIncomingMessageLength = 0; } - if (this.hdrRead) { - if (this.msgLimiter != null) { - this.msgLimiter.release(1); - this.msgLimiter = null; + if (this.readHeader) { + if (this.messageLimiter != null) { + this.messageLimiter.release(1); + this.messageLimiter = null; } - this.hdrRead = false; + this.readHeader = false; } this.flags = 0; } protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { - // hitesh: setting second bit of flags byte for client - // this is not require but this makes all changes easily at client side right now - // just see this bit and process security header + // setting second bit of flags byte for client this is not require but this makes all changes + // easily at client side right now just see this bit and process security header byte flagsByte = this.flags; if (isSecurityHeader) { flagsByte |= MESSAGE_HAS_SECURE_PART; @@ -516,14 +513,14 @@ public class Message { if (this.isRetry) { flagsByte |= MESSAGE_IS_RETRY; } - getCommBuffer().putInt(this.msgType).putInt(msgLen).putInt(this.numberOfParts) + getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts) .putInt(this.transactionId).put(flagsByte); } protected Part getSecurityPart() { - if (this.sc != null) { + if (this.serverConnection != null) { // look types right put get etc - return this.sc.updateAndGetSecurityPart(); + return this.serverConnection.updateAndGetSecurityPart(); } return null; } @@ -537,7 +534,7 @@ public class Message { this.isMetaRegion = isMetaRegion; } - public boolean getAndResetIsMetaRegion() { + boolean getAndResetIsMetaRegion() { boolean isMetaRegion = this.isMetaRegion; this.isMetaRegion = false; return isMetaRegion; @@ -546,21 +543,20 @@ public class Message { /** * Sends this message out on its socket. */ - protected void sendBytes(boolean clearMessage) throws IOException { - if (this.sc != null) { + void sendBytes(boolean clearMessage) throws IOException { + if (this.serverConnection != null) { // Keep track of the fact that we are making progress. - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } if (this.socket == null) { throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); } try { - final ByteBuffer cb = getCommBuffer(); - if (cb == null) { + final ByteBuffer commBuffer = getCommBuffer(); + if (commBuffer == null) { throw new IOException("No buffer"); } - int msgLen = 0; - synchronized (cb) { + synchronized (commBuffer) { long totalPartLen = 0; long headerLen = 0; int partsToTransmit = this.numberOfParts; @@ -581,50 +577,50 @@ public class Message { partsToTransmit++; } - if ((headerLen + totalPartLen) > Integer.MAX_VALUE) { + if (headerLen + totalPartLen > Integer.MAX_VALUE) { throw new MessageTooLargeException( "Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value"); } - msgLen = (int) (headerLen + totalPartLen); + int msgLen = (int) (headerLen + totalPartLen); - if (msgLen > MAX_MESSAGE_SIZE) { + if (msgLen > this.maxMessageSize) { throw new MessageTooLargeException("Message size (" + msgLen - + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")"); + + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")"); } - cb.clear(); - packHeaderInfoForSending(msgLen, (securityPart != null)); + commBuffer.clear(); + packHeaderInfoForSending(msgLen, securityPart != null); for (int i = 0; i < partsToTransmit; i++) { - Part part = (i == this.numberOfParts) ? securityPart : partsList[i]; + Part part = i == this.numberOfParts ? securityPart : this.partsList[i]; - if (cb.remaining() < PART_HEADER_SIZE) { + if (commBuffer.remaining() < PART_HEADER_SIZE) { flushBuffer(); } int partLen = part.getLength(); - cb.putInt(partLen); - cb.put(part.getTypeCode()); - if (partLen <= cb.remaining()) { - part.writeTo(cb); + commBuffer.putInt(partLen); + commBuffer.put(part.getTypeCode()); + if (partLen <= commBuffer.remaining()) { + part.writeTo(commBuffer); } else { flushBuffer(); - if (this.sockCh != null) { - part.writeTo(this.sockCh, cb); + if (this.socketChannel != null) { + part.writeTo(this.socketChannel, commBuffer); } else { - part.writeTo(this.os, cb); + part.writeTo(this.outputStream, commBuffer); } - if (this.msgStats != null) { - this.msgStats.incSentBytes(partLen); + if (this.messageStats != null) { + this.messageStats.incSentBytes(partLen); } } } - if (cb.position() != 0) { + if (commBuffer.position() != 0) { flushBuffer(); } this.messageModified = false; - if (this.sockCh == null) { - this.os.flush(); + if (this.socketChannel == null) { + this.outputStream.flush(); } } } finally { @@ -634,69 +630,67 @@ public class Message { } } - protected void flushBuffer() throws IOException { + void flushBuffer() throws IOException { final ByteBuffer cb = getCommBuffer(); - if (this.sockCh != null) { + if (this.socketChannel != null) { cb.flip(); do { - this.sockCh.write(cb); + this.socketChannel.write(cb); } while (cb.remaining() > 0); } else { - this.os.write(cb.array(), 0, cb.position()); + this.outputStream.write(cb.array(), 0, cb.position()); } - if (this.msgStats != null) { - this.msgStats.incSentBytes(cb.position()); + if (this.messageStats != null) { + this.messageStats.incSentBytes(cb.position()); } cb.clear(); } private void read() throws IOException { clearParts(); - // TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client - // :( + // TODO: for server changes make sure sc is not null as this class also used by client readHeaderAndPayload(); } /** * Read the actual bytes of the header off the socket */ - protected void fetchHeader() throws IOException { + void fetchHeader() throws IOException { final ByteBuffer cb = getCommBuffer(); cb.clear(); - // msgType is invalidated here and can be used as an indicator - // of problems reading the message - this.msgType = MessageType.INVALID; - int hdr = 0; + // messageType is invalidated here and can be used as an indicator + // of problems reading the message + this.messageType = MessageType.INVALID; final int headerLength = getHeaderLength(); - if (this.sockCh != null) { + if (this.socketChannel != null) { cb.limit(headerLength); do { - int bytesRead = this.sockCh.read(cb); - // System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb); + int bytesRead = this.socketChannel.read(cb); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER .toLocalizedString()); } - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(bytesRead); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(bytesRead); } } while (cb.remaining() > 0); cb.flip(); + } else { + int hdr = 0; do { - int bytesRead = -1; - bytesRead = this.is.read(cb.array(), hdr, headerLength - hdr); + int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - hdr); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER .toLocalizedString()); } hdr += bytesRead; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(bytesRead); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(bytesRead); } } while (hdr < headerLength); @@ -717,34 +711,36 @@ public class Message { if (!MessageType.validate(type)) { throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER - .toLocalizedString(Integer.valueOf(type))); + .toLocalizedString(type)); } + int timeToWait = 0; - if (this.sc != null) { + if (this.serverConnection != null) { // Keep track of the fact that a message is being processed. - this.sc.setProcessingMessage(); - timeToWait = sc.getClientReadTimeout(); + this.serverConnection.setProcessingMessage(); + timeToWait = this.serverConnection.getClientReadTimeout(); } - this.hdrRead = true; - if (this.msgLimiter != null) { + this.readHeader = true; + + if (this.messageLimiter != null) { for (;;) { - this.sc.getCachedRegionHelper().checkCancelInProgress(null); + this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); boolean interrupted = Thread.interrupted(); try { if (timeToWait == 0) { - this.msgLimiter.acquire(1); + this.messageLimiter.acquire(1); } else { - if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { - if (this.msgStats != null && this.msgStats instanceof CacheServerStats) { - ((CacheServerStats) this.msgStats).incConnectionsTimedOut(); + if (!this.messageLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { + if (this.messageStats instanceof CacheServerStats) { + ((CacheServerStats) this.messageStats).incConnectionsTimedOut(); } throw new IOException( LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS - .toLocalizedString(Integer.valueOf(timeToWait))); + .toLocalizedString(timeToWait)); } } break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -753,16 +749,17 @@ public class Message { } } // for } + if (len > 0) { if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) { throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1 - .toLocalizedString(new Object[] {Integer.valueOf(len), - Integer.valueOf(this.maxIncomingMessageLength)})); + .toLocalizedString(new Object[] {len, this.maxIncomingMessageLength})); } + if (this.dataLimiter != null) { for (;;) { - if (sc != null) { - this.sc.getCachedRegionHelper().checkCancelInProgress(null); + if (this.serverConnection != null) { + this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); } boolean interrupted = Thread.interrupted(); try { @@ -770,21 +767,21 @@ public class Message { this.dataLimiter.acquire(len); } else { int newTimeToWait = timeToWait; - if (this.msgLimiter != null) { + if (this.messageLimiter != null) { // may have waited for msg limit so recalc time to wait - newTimeToWait -= (int) sc.getCurrentMessageProcessingTime(); + newTimeToWait -= (int) this.serverConnection.getCurrentMessageProcessingTime(); } if (newTimeToWait <= 0 - || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { + || !this.messageLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { throw new IOException( LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS .toLocalizedString(timeToWait)); } } - this.payloadLength = len; // makes sure payloadLength gets set now so we will release - // the semaphore + // makes sure payloadLength gets set now so we will release the semaphore + this.payloadLength = len; break; // success - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -794,15 +791,15 @@ public class Message { } } } - if (this.msgStats != null) { - this.msgStats.incMessagesBeingReceived(len); + if (this.messageStats != null) { + this.messageStats.incMessagesBeingReceived(len); this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear } this.isRetry = (bits & MESSAGE_IS_RETRY) != 0; - bits = (byte) (bits & MESSAGE_IS_RETRY_MASK); + bits &= MESSAGE_IS_RETRY_MASK; this.flags = bits; - this.msgType = type; + this.messageType = type; readPayloadFields(numParts, len); @@ -813,32 +810,38 @@ public class Message { // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts this.transactionId = txid; this.flags = bits; - if (this.sc != null) { + if (this.serverConnection != null) { // Keep track of the fact that a message is being processed. - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } } - protected void readPayloadFields(final int numParts, final int len) throws IOException { + /** + * TODO: refactor overly long method readPayloadFields + */ + void readPayloadFields(final int numParts, final int len) throws IOException { if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) { throw new IOException( LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT - .toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)})); + .toLocalizedString(new Object[] {len, numParts})); } - Integer msgType = messageType.get(); + Integer msgType = MESSAGE_TYPE.get(); if (msgType != null && msgType == MessageType.PING) { - messageType.set(null); // set it to null right away. - int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping - // operation. + // set it to null right away. + MESSAGE_TYPE.set(null); + // Some number which will not throw OOM but still be acceptable for a ping operation. + int pingParts = 10; if (numParts > pingParts) { throw new IOException("Part length ( " + numParts + " ) is inconsistent for " + MessageType.getString(msgType) + " operation."); } } + setNumberOfParts(numParts); - if (numParts <= 0) + if (numParts <= 0) { return; + } if (len < 0) { logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len)); @@ -849,12 +852,11 @@ public class Message { cb.clear(); cb.flip(); - int readSecurePart = 0; - readSecurePart = checkAndSetSecurityPart(); + int readSecurePart = checkAndSetSecurityPart(); int bytesRemaining = len; - for (int i = 0; ((i < numParts + readSecurePart) - || ((readSecurePart == 1) && (cb.remaining() > 0))); i++) { + for (int i = 0; i < numParts + readSecurePart + || readSecurePart == 1 && cb.remaining() > 0; i++) { int bytesReadThisTime = readPartChunk(bytesRemaining); bytesRemaining -= bytesReadThisTime; @@ -869,6 +871,7 @@ public class Message { int partLen = cb.getInt(); byte partType = cb.get(); byte[] partBytes = null; + if (partLen > 0) { partBytes = new byte[partLen]; int alreadyReadBytes = cb.remaining(); @@ -878,26 +881,27 @@ public class Message { } cb.get(partBytes, 0, alreadyReadBytes); } + // now we need to read partLen - alreadyReadBytes off the wire int off = alreadyReadBytes; int remaining = partLen - off; while (remaining > 0) { - if (this.sockCh != null) { + if (this.socketChannel != null) { int bytesThisTime = remaining; cb.clear(); if (bytesThisTime > cb.capacity()) { bytesThisTime = cb.capacity(); } cb.limit(bytesThisTime); - int res = this.sockCh.read(cb); + int res = this.socketChannel.read(cb); if (res != -1) { cb.flip(); bytesRemaining -= res; remaining -= res; cb.get(partBytes, off, res); off += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -905,14 +909,13 @@ public class Message { .toLocalizedString()); } } else { - int res = 0; - res = this.is.read(partBytes, off, remaining); + int res = this.inputStream.read(partBytes, off, remaining); if (res != -1) { bytesRemaining -= res; remaining -= res; off += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -941,35 +944,38 @@ public class Message { * @return the number of bytes read into commBuffer */ private int readPartChunk(int bytesRemaining) throws IOException { - final ByteBuffer cb = getCommBuffer(); - if (cb.remaining() >= PART_HEADER_SIZE) { + final ByteBuffer commBuffer = getCommBuffer(); + if (commBuffer.remaining() >= PART_HEADER_SIZE) { // we already have the next part header in commBuffer so just return return 0; } - if (cb.position() != 0) { - cb.compact(); + + if (commBuffer.position() != 0) { + commBuffer.compact(); } else { - cb.position(cb.limit()); - cb.limit(cb.capacity()); + commBuffer.position(commBuffer.limit()); + commBuffer.limit(commBuffer.capacity()); } - int bytesRead = 0; - if (this.sc != null) { + + if (this.serverConnection != null) { // Keep track of the fact that we are making progress - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } - if (this.sockCh != null) { - int remaining = cb.remaining(); + int bytesRead = 0; + + if (this.socketChannel != null) { + int remaining = commBuffer.remaining(); if (remaining > bytesRemaining) { remaining = bytesRemaining; - cb.limit(cb.position() + bytesRemaining); + commBuffer.limit(commBuffer.position() + bytesRemaining); } while (remaining > 0) { - int res = this.sockCh.read(cb); + int res = this.socketChannel.read(commBuffer); if (res != -1) { remaining -= res; bytesRead += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -979,21 +985,20 @@ public class Message { } } else { - int bufSpace = cb.capacity() - cb.position(); - int bytesToRead = bufSpace; + int bytesToRead = commBuffer.capacity() - commBuffer.position(); if (bytesRemaining < bytesToRead) { bytesToRead = bytesRemaining; } - int pos = cb.position(); + int pos = commBuffer.position(); + while (bytesToRead > 0) { - int res = 0; - res = this.is.read(cb.array(), pos, bytesToRead); + int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead); if (res != -1) { bytesToRead -= res; pos += res; bytesRead += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -1001,9 +1006,10 @@ public class Message { .toLocalizedString()); } } - cb.position(pos); + + commBuffer.position(pos); } - cb.flip(); + commBuffer.flip(); return bytesRead; } @@ -1011,40 +1017,39 @@ public class Message { * Gets rid of all the parts that have been added to this message. */ public void clearParts() { - for (int i = 0; i < partsList.length; i++) { - partsList[i].clear(); + for (Part part : this.partsList) { + part.clear(); } this.currentPart = 0; } @Override public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("type=").append(MessageType.getString(msgType)); - sb.append("; payloadLength=").append(payloadLength); - sb.append("; numberOfParts=").append(numberOfParts); - sb.append("; transactionId=").append(transactionId); - sb.append("; currentPart=").append(currentPart); - sb.append("; messageModified=").append(messageModified); - sb.append("; flags=").append(Integer.toHexString(flags)); - for (int i = 0; i < numberOfParts; i++) { + StringBuilder sb = new StringBuilder(); + sb.append("type=").append(MessageType.getString(this.messageType)); + sb.append("; payloadLength=").append(this.payloadLength); + sb.append("; numberOfParts=").append(this.numberOfParts); + sb.append("; transactionId=").append(this.transactionId); + sb.append("; currentPart=").append(this.currentPart); + sb.append("; messageModified=").append(this.messageModified); + sb.append("; flags=").append(Integer.toHexString(this.flags)); + for (int i = 0; i < this.numberOfParts; i++) { sb.append("; part[").append(i).append("]={"); - sb.append(this.partsList[i].toString()); + sb.append(this.partsList[i]); sb.append("}"); } return sb.toString(); } - - public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) + void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { - this.sc = sc; + this.serverConnection = sc; setComms(socket, bb, msgStats); } - public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { - this.sockCh = socket.getChannel(); - if (this.sockCh == null) { + void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { + this.socketChannel = socket.getChannel(); + if (this.socketChannel == null) { setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats); } else { setComms(socket, null, null, bb, msgStats); @@ -1052,14 +1057,14 @@ public class Message { } public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, - MessageStats msgStats) throws IOException { + MessageStats msgStats) { Assert.assertTrue(socket != null); this.socket = socket; - this.sockCh = socket.getChannel(); - this.is = is; - this.os = os; + this.socketChannel = socket.getChannel(); + this.inputStream = is; + this.outputStream = os; this.cachedCommBuffer = bb; - this.msgStats = msgStats; + this.messageStats = msgStats; } /** @@ -1069,11 +1074,11 @@ public class Message { */ public void unsetComms() { this.socket = null; - this.sockCh = null; - this.is = null; - this.os = null; + this.socketChannel = null; + this.inputStream = null; + this.outputStream = null; this.cachedCommBuffer = null; - this.msgStats = null; + this.messageStats = null; } /** @@ -1084,7 +1089,7 @@ public class Message { } public void send(ServerConnection servConn) throws IOException { - if (this.sc != servConn) + if (this.serverConnection != servConn) throw new IllegalStateException("this.sc was not correctly set"); send(true); } @@ -1097,7 +1102,7 @@ public class Message { } /** - * Populates the stats of this <code>Message</code> with information received via its socket + * Populates the stats of this {@code Message} with information received via its socket */ public void recv() throws IOException { if (this.socket != null) { @@ -1111,10 +1116,10 @@ public class Message { public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, Semaphore msgLimiter) throws IOException { - this.sc = sc; + this.serverConnection = sc; this.maxIncomingMessageLength = maxMessageLength; this.dataLimiter = dataLimiter; - this.msgLimiter = msgLimiter; + this.messageLimiter = msgLimiter; recv(); }
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 83d0e9d..485ccae 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -723,12 +723,10 @@ public class ServerConnection implements Runnable { ThreadState threadState = null; try { if (msg != null) { - // this.logger.fine("donormalMsg() msgType " + msg.getMessageType()); - // Since this thread is not interrupted when the cache server is - // shutdown, - // test again after a message has been read. This is a bit of a hack. I - // think this thread should be interrupted, but currently AcceptorImpl - // doesn't keep track of the threads that it launches. + // Since this thread is not interrupted when the cache server is shutdown, test again after + // a message has been read. This is a bit of a hack. I think this thread should be + // interrupted, but currently AcceptorImpl doesn't keep track of the threads that it + // launches. if (!this.processMessages || (crHelper.isShutdown())) { if (logger.isDebugEnabled()) { logger.debug("{} ignoring message of type {} from client {} due to shutdown.", @@ -1078,8 +1076,6 @@ public class ServerConnection implements Runnable { */ public Part updateAndGetSecurityPart() { // need to take care all message types here - // this.logger.fine("getSecurityPart() msgType = " - // + this.requestMsg.msgType); if (AcceptorImpl.isAuthenticationRequired() && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0 && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY) @@ -1090,40 +1086,40 @@ public class ServerConnection implements Runnable { if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) { logger.debug( "ServerConnection.updateAndGetSecurityPart() not adding security part for msg type {}", - MessageType.getString(this.requestMsg.msgType)); + MessageType.getString(this.requestMsg.messageType)); } } return null; } private boolean isInternalMessage() { - return (this.requestMsg.msgType == MessageType.CLIENT_READY - || this.requestMsg.msgType == MessageType.CLOSE_CONNECTION - || this.requestMsg.msgType == MessageType.GETCQSTATS_MSG_TYPE - || this.requestMsg.msgType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES - || this.requestMsg.msgType == MessageType.GET_CLIENT_PR_METADATA - || this.requestMsg.msgType == MessageType.INVALID - || this.requestMsg.msgType == MessageType.MAKE_PRIMARY - || this.requestMsg.msgType == MessageType.MONITORCQ_MSG_TYPE - || this.requestMsg.msgType == MessageType.PERIODIC_ACK - || this.requestMsg.msgType == MessageType.PING - || this.requestMsg.msgType == MessageType.REGISTER_DATASERIALIZERS - || this.requestMsg.msgType == MessageType.REGISTER_INSTANTIATORS - || this.requestMsg.msgType == MessageType.REQUEST_EVENT_VALUE - || this.requestMsg.msgType == MessageType.ADD_PDX_TYPE - || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_TYPE - || this.requestMsg.msgType == MessageType.GET_PDX_TYPE_BY_ID - || this.requestMsg.msgType == MessageType.SIZE - || this.requestMsg.msgType == MessageType.TX_FAILOVER - || this.requestMsg.msgType == MessageType.TX_SYNCHRONIZATION - || this.requestMsg.msgType == MessageType.GET_FUNCTION_ATTRIBUTES - || this.requestMsg.msgType == MessageType.ADD_PDX_ENUM - || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_ENUM - || this.requestMsg.msgType == MessageType.GET_PDX_ENUM_BY_ID - || this.requestMsg.msgType == MessageType.GET_PDX_TYPES - || this.requestMsg.msgType == MessageType.GET_PDX_ENUMS - || this.requestMsg.msgType == MessageType.COMMIT - || this.requestMsg.msgType == MessageType.ROLLBACK); + return (this.requestMsg.messageType == MessageType.CLIENT_READY + || this.requestMsg.messageType == MessageType.CLOSE_CONNECTION + || this.requestMsg.messageType == MessageType.GETCQSTATS_MSG_TYPE + || this.requestMsg.messageType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES + || this.requestMsg.messageType == MessageType.GET_CLIENT_PR_METADATA + || this.requestMsg.messageType == MessageType.INVALID + || this.requestMsg.messageType == MessageType.MAKE_PRIMARY + || this.requestMsg.messageType == MessageType.MONITORCQ_MSG_TYPE + || this.requestMsg.messageType == MessageType.PERIODIC_ACK + || this.requestMsg.messageType == MessageType.PING + || this.requestMsg.messageType == MessageType.REGISTER_DATASERIALIZERS + || this.requestMsg.messageType == MessageType.REGISTER_INSTANTIATORS + || this.requestMsg.messageType == MessageType.REQUEST_EVENT_VALUE + || this.requestMsg.messageType == MessageType.ADD_PDX_TYPE + || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_TYPE + || this.requestMsg.messageType == MessageType.GET_PDX_TYPE_BY_ID + || this.requestMsg.messageType == MessageType.SIZE + || this.requestMsg.messageType == MessageType.TX_FAILOVER + || this.requestMsg.messageType == MessageType.TX_SYNCHRONIZATION + || this.requestMsg.messageType == MessageType.GET_FUNCTION_ATTRIBUTES + || this.requestMsg.messageType == MessageType.ADD_PDX_ENUM + || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_ENUM + || this.requestMsg.messageType == MessageType.GET_PDX_ENUM_BY_ID + || this.requestMsg.messageType == MessageType.GET_PDX_TYPES + || this.requestMsg.messageType == MessageType.GET_PDX_ENUMS + || this.requestMsg.messageType == MessageType.COMMIT + || this.requestMsg.messageType == MessageType.ROLLBACK); } public void run() { http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java new file mode 100644 index 0000000..7118347 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.geode.CancelException; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; +import org.apache.geode.distributed.internal.MessageWithReply; +import org.apache.geode.distributed.internal.ReplyMessage; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.internal.InternalDataSerializer; + +/** + * Send interest registration to another server. Since interest registration performs a state-flush + * operation this message must not transmitted on an ordered socket. + * <p> + * Extracted from CacheClientNotifier + */ +public class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage + implements MessageWithReply { + + private ClientProxyMembershipID clientId; + private ClientInterestMessageImpl clientMessage; + private int processorId; + + ServerInterestRegistrationMessage(ClientProxyMembershipID clientId, + ClientInterestMessageImpl clientInterestMessage) { + this.clientId = clientId; + this.clientMessage = clientInterestMessage; + } + + public ServerInterestRegistrationMessage() { + // deserializing in fromData + } + + static void sendInterestChange(DM dm, ClientProxyMembershipID clientId, + ClientInterestMessageImpl clientInterestMessage) { + ServerInterestRegistrationMessage registrationMessage = + new ServerInterestRegistrationMessage(clientId, clientInterestMessage); + + Set recipients = dm.getOtherDistributionManagerIds(); + registrationMessage.setRecipients(recipients); + + ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, recipients); + registrationMessage.processorId = replyProcessor.getProcessorId(); + + dm.putOutgoing(registrationMessage); + + try { + replyProcessor.waitForReplies(); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + @Override + protected void process(DistributionManager dm) { + // Get the proxy for the proxy id + try { + CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); + if (clientNotifier != null) { + CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId); + // If this VM contains a proxy for the requested proxy id, forward the + // message on to the proxy for processing + if (proxy != null) { + proxy.processInterestMessage(this.clientMessage); + } + } + } finally { + ReplyMessage reply = new ReplyMessage(); + reply.setProcessorId(this.processorId); + reply.setRecipient(getSender()); + try { + dm.putOutgoing(reply); + } catch (CancelException ignore) { + // can't send a reply, so ignore the exception + } + } + } + + @Override + public int getDSFID() { + return SERVER_INTEREST_REGISTRATION_MESSAGE; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + out.writeInt(this.processorId); + InternalDataSerializer.invokeToData(this.clientId, out); + InternalDataSerializer.invokeToData(this.clientMessage, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.processorId = in.readInt(); + this.clientId = new ClientProxyMembershipID(); + InternalDataSerializer.invokeFromData(this.clientId, in); + this.clientMessage = new ClientInterestMessageImpl(); + InternalDataSerializer.invokeFromData(this.clientMessage, in); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java index 1b599e9..2cb36cd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java @@ -39,29 +39,30 @@ public class AddPdxEnum extends BaseCommand { private AddPdxEnum() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); if (logger.isDebugEnabled()) { logger.debug("{}: Received get pdx id for enum request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), + serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); - EnumInfo enumInfo = (EnumInfo) msg.getPart(0).getObject(); - int enumId = msg.getPart(1).getInt(); + EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject(); + int enumId = clientMessage.getPart(1).getInt(); try { - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteEnum(enumId, enumInfo); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java index 9b8302e..3feba0d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java @@ -39,33 +39,34 @@ public class AddPdxType extends BaseCommand { private AddPdxType() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, ClassNotFoundException { - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); if (logger.isDebugEnabled()) { logger.debug("{}: Received get pdx id for type request ({} parts) from {}", - servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString()); + serverConnection.getName(), clientMessage.getNumberOfParts(), + serverConnection.getSocketString()); } - int noOfParts = msg.getNumberOfParts(); + int noOfParts = clientMessage.getNumberOfParts(); - PdxType type = (PdxType) msg.getPart(0).getObject(); - int typeId = msg.getPart(1).getInt(); + PdxType type = (PdxType) clientMessage.getPart(0).getObject(); + int typeId = clientMessage.getPart(1).getInt(); // The native client needs this line // because it doesn't set the type id on the // client side. type.setTypeId(typeId); try { - InternalCache cache = servConn.getCache(); + InternalCache cache = serverConnection.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteType(typeId, type); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java index 959430c..ab19954 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java @@ -47,15 +47,15 @@ public class ClearRegion extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException, InterruptedException { Part regionNamePart = null, callbackArgPart = null; String regionName = null; Object callbackArg = null; Part eventPart = null; - CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); - CacheServerStats stats = servConn.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); + CacheServerStats stats = serverConnection.getCacheServerStats(); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; @@ -63,36 +63,38 @@ public class ClearRegion extends BaseCommand { stats.incReadClearRegionRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - eventPart = msg.getPart(1); + regionNamePart = clientMessage.getPart(0); + eventPart = clientMessage.getPart(1); // callbackArgPart = null; (redundant assignment) - if (msg.getNumberOfParts() > 2) { - callbackArgPart = msg.getPart(2); + if (clientMessage.getNumberOfParts() > 2) { + callbackArgPart = clientMessage.getPart(2); try { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } regionName = regionNamePart.getString(); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Received clear region request (" + msg.getPayloadLength() - + " bytes) from " + servConn.getSocketString() + " for region " + regionName); + logger.debug(serverConnection.getName() + ": Received clear region request (" + + clientMessage.getPayloadLength() + " bytes) from " + serverConnection.getSocketString() + + " for region " + regionName); } // Process the clear region request if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ClearRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); String errMessage = LocalizedStrings.ClearRegion_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL .toLocalizedString(); - writeErrorResponse(msg, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -100,35 +102,36 @@ public class ClearRegion extends BaseCommand { if (region == null) { String reason = LocalizedStrings.ClearRegion_WAS_NOT_FOUND_DURING_CLEAR_REGION_REGUEST .toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); - EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId); + EventID eventId = + new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId); try { // Clear the region this.securityService.authorizeRegionWrite(regionName); - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { RegionClearOperationContext clearContext = authzRequest.clearAuthorize(regionName, callbackArg); callbackArg = clearContext.getCallbackArg(); } - region.basicBridgeClear(callbackArg, servConn.getProxyID(), + region.basicBridgeClear(callbackArg, serverConnection.getProxyID(), true /* boolean from cache Client */, eventId); } catch (Exception e) { // If an interrupted exception is thrown , rethrow it - checkForInterrupt(servConn, e); + checkForInterrupt(serverConnection, e); // If an exception occurs during the clear, preserve the connection - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } @@ -138,10 +141,11 @@ public class ClearRegion extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessClearRegionTime(start - oldStart); } - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Sent clear region response for region " + regionName); + logger.debug( + serverConnection.getName() + ": Sent clear region response for region " + regionName); } stats.incWriteClearRegionResponseTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java index d50e522..cf9c470 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java @@ -35,34 +35,36 @@ public class ClientReady extends BaseCommand { private ClientReady() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadClientReadyRequestTime(start - oldStart); } try { - String clientHost = servConn.getSocketHost(); - int clientPort = servConn.getSocketPort(); + String clientHost = serverConnection.getSocketHost(); + int clientPort = serverConnection.getSocketPort(); if (logger.isDebugEnabled()) { logger.debug("{}: Received client ready request ({} bytes) from {} on {}:{}", - servConn.getName(), msg.getPayloadLength(), servConn.getProxyID(), clientHost, - clientPort); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getProxyID(), clientHost, clientPort); } - servConn.getAcceptor().getCacheClientNotifier().readyForEvents(servConn.getProxyID()); + serverConnection.getAcceptor().getCacheClientNotifier() + .readyForEvents(serverConnection.getProxyID()); long oldStart = start; start = DistributionStats.getStatTime(); stats.incProcessClientReadyTime(start - oldStart); - writeReply(msg, servConn); - servConn.setAsTrue(RESPONDED); + writeReply(clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug(servConn.getName() + ": Processed client ready request from " - + servConn.getProxyID() + " on " + clientHost + ":" + clientPort); + logger.debug(serverConnection.getName() + ": Processed client ready request from " + + serverConnection.getProxyID() + " on " + clientHost + ":" + clientPort); } } finally { stats.incWriteClientReadyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java index 66045aa..21f0cad 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java @@ -39,43 +39,44 @@ public class CloseConnection extends BaseCommand { private CloseConnection() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - CacheServerStats stats = servConn.getCacheServerStats(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { + CacheServerStats stats = serverConnection.getCacheServerStats(); long oldStart = start; - boolean respondToClient = servConn.getClientVersion().compareTo(Version.GFE_90) >= 0; + boolean respondToClient = serverConnection.getClientVersion().compareTo(Version.GFE_90) >= 0; start = DistributionStats.getStatTime(); stats.incReadCloseConnectionRequestTime(start - oldStart); if (respondToClient) { // newer clients will wait for a response or EOFException - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); } try { - servConn.setClientDisconnectCleanly(); - String clientHost = servConn.getSocketHost(); - int clientPort = servConn.getSocketPort(); + serverConnection.setClientDisconnectCleanly(); + String clientHost = serverConnection.getSocketHost(); + int clientPort = serverConnection.getSocketPort(); if (logger.isDebugEnabled()) { - logger.debug("{}: Received close request ({} bytes) from {}:{}", servConn.getName(), - msg.getPayloadLength(), clientHost, clientPort); + logger.debug("{}: Received close request ({} bytes) from {}:{}", serverConnection.getName(), + clientMessage.getPayloadLength(), clientHost, clientPort); } - Part keepalivePart = msg.getPart(0); + Part keepalivePart = clientMessage.getPart(0); byte[] keepaliveByte = keepalivePart.getSerializedForm(); boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true; - servConn.getAcceptor().getCacheClientNotifier().setKeepAlive(servConn.getProxyID(), - keepalive); + serverConnection.getAcceptor().getCacheClientNotifier() + .setKeepAlive(serverConnection.getProxyID(), keepalive); if (logger.isDebugEnabled()) { - logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", servConn.getName(), - clientHost, clientPort, keepalive); + logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", + serverConnection.getName(), clientHost, clientPort, keepalive); } } finally { if (respondToClient) { - writeReply(msg, servConn); + writeReply(clientMessage, serverConnection); } - servConn.setFlagProcessMessagesAsFalse(); + serverConnection.setFlagProcessMessagesAsFalse(); stats.incProcessCloseConnectionTime(DistributionStats.getStatTime() - start); } http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java index 55ef09b..366d77c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java @@ -50,12 +50,13 @@ public class CommitCommand extends BaseCommand { private CommitCommand() {} @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { - servConn.setAsTrue(REQUIRES_RESPONSE); - TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager(); + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { + serverConnection.setAsTrue(REQUIRES_RESPONSE); + TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); InternalDistributedMember client = - (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); - int uniqId = msg.getTransactionId(); + (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + int uniqId = clientMessage.getTransactionId(); TXId txId = new TXId(client, uniqId); TXCommitMessage commitMsg = null; if (txMgr.isHostedTxRecentlyCompleted(txId)) { @@ -64,11 +65,11 @@ public class CommitCommand extends BaseCommand { logger.debug("TX: returning a recently committed txMessage for tx: {}", txId); } if (!txMgr.isExceptionToken(commitMsg)) { - writeCommitResponse(commitMsg, msg, servConn); + writeCommitResponse(commitMsg, clientMessage, serverConnection); commitMsg.setClientVersion(null); // fixes bug 46529 - servConn.setAsTrue(RESPONDED); + serverConnection.setAsTrue(RESPONDED); } else { - sendException(msg, servConn, txMgr.getExceptionForToken(commitMsg, txId)); + sendException(clientMessage, serverConnection, txMgr.getExceptionForToken(commitMsg, txId)); } txMgr.removeHostedTXState(txId); return; @@ -87,10 +88,10 @@ public class CommitCommand extends BaseCommand { txMgr.commit(); commitMsg = txProxy.getCommitMessage(); - writeCommitResponse(commitMsg, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeCommitResponse(commitMsg, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); } catch (Exception e) { - sendException(msg, servConn, e); + sendException(clientMessage, serverConnection, e); } finally { if (txId != null) { txMgr.removeHostedTXState(txId); @@ -115,7 +116,7 @@ public class CommitCommand extends BaseCommand { if (response != null) { response.setClientVersion(servConn.getClientVersion()); } - responseMsg.addObjPart(response, zipValues); + responseMsg.addObjPart(response, false); servConn.getCache().getCancelCriterion().checkCancelInProgress(null); if (logger.isDebugEnabled()) { logger.debug("TX: sending a nonNull response for transaction: {}", http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java index c1b67e1..9cb2528 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java @@ -51,34 +51,36 @@ public class ContainsKey extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { Part regionNamePart = null; Part keyPart = null; String regionName = null; Object key = null; - CacheServerStats stats = servConn.getCacheServerStats(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadContainsKeyRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key); } // Process the containsKey request @@ -87,47 +89,48 @@ public class ContainsKey extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } - writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName, key.toString()); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { try { authzRequest.containsKeyAuthorize(regionName, key); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -140,10 +143,10 @@ public class ContainsKey extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessContainsKeyTime(start - oldStart); } - writeContainsKeyResponse(containsKey, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeContainsKeyResponse(containsKey, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start); http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java index dc8f9eb..b2ce055 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java @@ -55,34 +55,36 @@ public class ContainsKey66 extends BaseCommand { } @Override - public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException { + public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) + throws IOException { Part regionNamePart = null, keyPart = null; String regionName = null; Object key = null; ContainsKeyOp.MODE mode; - CacheServerStats stats = servConn.getCacheServerStats(); + CacheServerStats stats = serverConnection.getCacheServerStats(); - servConn.setAsTrue(REQUIRES_RESPONSE); + serverConnection.setAsTrue(REQUIRES_RESPONSE); { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadContainsKeyRequestTime(start - oldStart); } // Retrieve the data from the message parts - regionNamePart = msg.getPart(0); - keyPart = msg.getPart(1); - mode = ContainsKeyOp.MODE.values()[(msg.getPart(2).getInt())]; + regionNamePart = clientMessage.getPart(0); + keyPart = clientMessage.getPart(1); + mode = ContainsKeyOp.MODE.values()[(clientMessage.getPart(2).getInt())]; regionName = regionNamePart.getString(); try { key = keyPart.getStringOrObject(); } catch (Exception e) { - writeException(msg, e, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } if (logger.isDebugEnabled()) { logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}", - servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key); + serverConnection.getName(), clientMessage.getPayloadLength(), + serverConnection.getSocketString(), regionName, key); } // Process the containsKey request @@ -91,46 +93,47 @@ public class ContainsKey66 extends BaseCommand { if (key == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } if (regionName == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL, - servConn.getName())); + serverConnection.getName())); errMessage = LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL .toLocalizedString(); } - writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn); - servConn.setAsTrue(RESPONDED); + writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, + serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName); + LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString(); - writeRegionDestroyedEx(msg, regionName, reason, servConn); - servConn.setAsTrue(RESPONDED); + writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } try { this.securityService.authorizeRegionRead(regionName, key.toString()); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } - AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { try { authzRequest.containsKeyAuthorize(regionName, key); } catch (NotAuthorizedException ex) { - writeException(msg, ex, false, servConn); - servConn.setAsTrue(RESPONDED); + writeException(clientMessage, ex, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); return; } } @@ -157,10 +160,10 @@ public class ContainsKey66 extends BaseCommand { start = DistributionStats.getStatTime(); stats.incProcessContainsKeyTime(start - oldStart); } - writeContainsKeyResponse(containsKey, msg, servConn); - servConn.setAsTrue(RESPONDED); + writeContainsKeyResponse(containsKey, clientMessage, serverConnection); + serverConnection.setAsTrue(RESPONDED); if (logger.isDebugEnabled()) { - logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(), + logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(), regionName, key); } stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start);