Cleanup Message class
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d785ca31 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d785ca31 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d785ca31 Branch: refs/heads/feature/GEODE-2632-17 Commit: d785ca31b2c3abb71ab37dee9fc2629eeaafa285 Parents: 52d2e58 Author: Kirk Lund <kl...@apache.org> Authored: Mon May 22 13:47:55 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Tue May 23 14:47:39 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/geode/Instantiator.java | 112 ++-- .../geode/cache/client/internal/AbstractOp.java | 2 +- .../geode/cache/client/internal/PingOp.java | 10 +- .../cache/tier/sockets/CacheClientUpdater.java | 17 +- .../cache/tier/sockets/ChunkedMessage.java | 19 +- .../internal/cache/tier/sockets/Message.java | 591 ++++++++++--------- .../cache/tier/sockets/ServerConnection.java | 65 +- .../apache/geode/internal/tcp/Connection.java | 2 +- .../org/apache/geode/internal/util/IOUtils.java | 6 +- .../cache/tier/sockets/MessageJUnitTest.java | 64 +- .../internal/JUnit4DistributedTestCase.java | 2 +- ...arallelGatewaySenderOperationsDUnitTest.java | 16 +- 12 files changed, 448 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/Instantiator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/Instantiator.java b/geode-core/src/main/java/org/apache/geode/Instantiator.java index 3c1ca06..c727e5b 100644 --- a/geode-core/src/main/java/org/apache/geode/Instantiator.java +++ b/geode-core/src/main/java/org/apache/geode/Instantiator.java @@ -20,15 +20,15 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.i18n.LocalizedStrings; /** - * <code>Instantiator</code> allows classes that implement {@link DataSerializable} to be registered - * with the data serialization framework. Knowledge of <code>DataSerializable</code> classes allows + * {@code Instantiator} allows classes that implement {@link DataSerializable} to be registered + * with the data serialization framework. Knowledge of {@code DataSerializable} classes allows * the framework to optimize how instances of those classes are data serialized. * * <P> * - * Ordinarily, when a <code>DataSerializable</code> object is written using + * Ordinarily, when a {@code DataSerializable} object is written using * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, a special marker class id is - * written to the stream followed by the class name of the <code>DataSerializable</code> object. + * written to the stream followed by the class name of the {@code DataSerializable} object. * After the marker class id is read by {@link DataSerializer#readObject} it performs the following * operations, * @@ -44,23 +44,20 @@ import org.apache.geode.internal.i18n.LocalizedStrings; * * </OL> * - * However, if a <code>DataSerializable</code> class is {@linkplain #register(Instantiator) + * However, if a {@code DataSerializable} class is {@linkplain #register(Instantiator) * registered} with the data serialization framework and assigned a unique class id, an important * optimization can be performed that avoid the expense of using reflection to instantiate the - * <code>DataSerializable</code> class. When the object is written using + * {@code DataSerializable} class. When the object is written using * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, the object's registered class id * is written to the stream. Consequently, when the data is read from the stream, the - * {@link #newInstance} method of the appropriate <code>Instantiator</code> instance is invoked to - * create an "empty" instance of the <code>DataSerializable</code> instead of using reflection to + * {@link #newInstance} method of the appropriate {@code Instantiator} instance is invoked to + * create an "empty" instance of the {@code DataSerializable} instead of using reflection to * create the new instance. * * <P> * - * Commonly, a <code>DataSerializable</code> class will register itself with the - * <code>Instantiator</code> in a static initializer as shown in the below example code. - * - * <!-- The source code for the CompanySerializer class resides in tests/com/examples/ds/User.java - * Please keep the below code snippet in sync with that file. --> + * Commonly, a {@code DataSerializable} class will register itself with the + * {@code Instantiator} in a static initializer as shown in the below example code. * * <PRE> public class User implements DataSerializable { @@ -101,22 +98,22 @@ public class User implements DataSerializable { } * </PRE> * - * <code>Instantiator</code>s may be distributed to other members of the distributed system when + * {@code Instantiator}s may be distributed to other members of the distributed system when * they are registered. Consider the following scenario in which VM1 and VM2 are members of the same * distributed system. Both VMs define the sameRegion and VM2's region replicates the contents of - * VM1's using replication. VM1 puts an instance of the above <code>User</code> class into the - * region. The act of instantiating <code>User</code> will load the <code>User</code> class and - * invoke its static initializer, thus registering the <code>Instantiator</code> with the data - * serialization framework. Because the region is a replicate, the <code>User</code> will be data - * serialized and sent to VM2. However, when VM2 attempts to data deserialize the <code>User</code>, - * its <code>Instantiator</code> will not necessarily be registered because <code>User</code>'s + * VM1's using replication. VM1 puts an instance of the above {@code User} class into the + * region. The act of instantiating {@code User} will load the {@code User} class and + * invoke its static initializer, thus registering the {@code Instantiator} with the data + * serialization framework. Because the region is a replicate, the {@code User} will be data + * serialized and sent to VM2. However, when VM2 attempts to data deserialize the {@code User}, + * its {@code Instantiator} will not necessarily be registered because {@code User}'s * static initializer may not have been invoked yet. As a result, an exception would be logged while - * deserializing the <code>User</code> and the replicate would not appear to have the new value. So, - * in order to ensure that the <code>Instantiator</code> is registered in VM2, the data - * serialization framework distributes a message to each member when an <code>Instantiator</code> is + * deserializing the {@code User} and the replicate would not appear to have the new value. So, + * in order to ensure that the {@code Instantiator} is registered in VM2, the data + * serialization framework distributes a message to each member when an {@code Instantiator} is * {@linkplain #register(Instantiator) registered}. * <p> - * Note that the framework does not require that an <code>Instantiator</code> be + * Note that the framework does not require that an {@code Instantiator} be * {@link java.io.Serializable}, but it does require that it provide a * {@linkplain #Instantiator(Class, int) two-argument constructor}. * @@ -133,63 +130,64 @@ public abstract class Instantiator { */ private Class<? extends DataSerializable> clazz; - /** The id of this <code>Instantiator</code> */ + /** The id of this {@code Instantiator} */ private int id; - /** The eventId of this <code>Instantiator</code> */ + /** The eventId of this {@code Instantiator} */ private EventID eventId; - /** The originator of this <code>Instantiator</code> */ + /** The originator of this {@code Instantiator} */ private ClientProxyMembershipID context; /** - * Registers a <code>DataSerializable</code> class with the data serialization framework. This + * Registers a {@code DataSerializable} class with the data serialization framework. This * method is usually invoked from the static initializer of a class that implements - * <code>DataSerializable</code>. + * {@code DataSerializable}. * - * @param instantiator An <code>Instantiator</code> whose {@link #newInstance} method is invoked + * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked * when an object is data deserialized. * - * @throws IllegalStateException If class <code>c</code> is already registered with a different - * class id, or another class has already been registered with id <code>classId</code> - * @throws NullPointerException If <code>instantiator</code> is <code>null</code>. + * @throws IllegalStateException If class {@code c} is already registered with a different + * class id, or another class has already been registered with id {@code classId} + * @throws NullPointerException If {@code instantiator} is {@code null}. */ public static synchronized void register(Instantiator instantiator) { InternalInstantiator.register(instantiator, true); } /** - * Registers a <code>DataSerializable</code> class with the data serialization framework. This + * Registers a {@code DataSerializable} class with the data serialization framework. This * method is usually invoked from the static initializer of a class that implements - * <code>DataSerializable</code>. + * {@code DataSerializable}. * - * @param instantiator An <code>Instantiator</code> whose {@link #newInstance} method is invoked + * @param instantiator An {@code Instantiator} whose {@link #newInstance} method is invoked * when an object is data deserialized. * - * @param distribute True if the registered <code>Instantiator</code> has to be distributed to + * @param distribute True if the registered {@code Instantiator} has to be distributed to * other members of the distributed system. Note that if distribute is set to false it may * still be distributed in some cases. * - * @throws IllegalArgumentException If class <code>c</code> is already registered with a different - * class id, or another class has already been registered with id <code>classId</code> - * @throws NullPointerException If <code>instantiator</code> is <code>null</code>. + * @throws IllegalArgumentException If class {@code c} is already registered with a different + * class id, or another class has already been registered with id {@code classId} + * @throws NullPointerException If {@code instantiator} is {@code null}. * @deprecated as of 9.0 use {@link Instantiator#register(Instantiator)} instead */ + @Deprecated public static synchronized void register(Instantiator instantiator, boolean distribute) { InternalInstantiator.register(instantiator, distribute); } /** - * Creates a new <code>Instantiator</code> that instantiates a given class. + * Creates a new {@code Instantiator} that instantiates a given class. * - * @param c The <code>DataSerializable</code> class to register. This class must have a static - * initializer that registers this <code>Instantiator</code>. - * @param classId A unique id for class <code>c</code>. The <code>classId</code> must not be zero. - * This has been an <code>int</code> since dsPhase1. + * @param c The {@code DataSerializable} class to register. This class must have a static + * initializer that registers this {@code Instantiator}. + * @param classId A unique id for class {@code c}. The {@code classId} must not be zero. + * This has been an {@code int} since dsPhase1. * - * @throws IllegalArgumentException If <code>c</code> does not implement - * <code>DataSerializable</code>, <code>classId</code> is less than or equal to zero. - * @throws NullPointerException If <code>c</code> is <code>null</code> + * @throws IllegalArgumentException If {@code c} does not implement + * {@code DataSerializable}, {@code classId} is less than or equal to zero. + * @throws NullPointerException If {@code c} is {@code null} */ public Instantiator(Class<? extends DataSerializable> c, int classId) { if (c == null) { @@ -205,7 +203,7 @@ public abstract class Instantiator { if (classId == 0) { throw new IllegalArgumentException(LocalizedStrings.Instantiator_CLASS_ID_0_MUST_NOT_BE_0 - .toLocalizedString(Integer.valueOf(classId))); + .toLocalizedString(classId)); } this.clazz = c; @@ -213,7 +211,7 @@ public abstract class Instantiator { } /** - * Creates a new "empty" instance of a <Code>DataSerializable</code> class whose state will be + * Creates a new "empty" instance of a {@code DataSerializable} class whose state will be * filled in by invoking its {@link DataSerializable#fromData fromData} method. * * @see DataSerializer#readObject @@ -221,29 +219,29 @@ public abstract class Instantiator { public abstract DataSerializable newInstance(); /** - * Returns the <code>DataSerializable</code> class that is instantiated by this - * <code>Instantiator</code>. + * Returns the {@code DataSerializable} class that is instantiated by this + * {@code Instantiator}. */ public Class<? extends DataSerializable> getInstantiatedClass() { return this.clazz; } /** - * Returns the unique <code>id</code> of this <code>Instantiator</code>. + * Returns the unique {@code id} of this {@code Instantiator}. */ public int getId() { return this.id; } /** - * sets the unique <code>eventId</code> of this <code>Instantiator</code>. For internal use only. + * sets the unique {@code eventId} of this {@code Instantiator}. For internal use only. */ public void setEventId(Object/* EventID */ eventId) { this.eventId = (EventID) eventId; } /** - * Returns the unique <code>eventId</code> of this <code>Instantiator</code>. For internal use + * Returns the unique {@code eventId} of this {@code Instantiator}. For internal use * only. */ public Object/* EventID */ getEventId() { @@ -251,14 +249,14 @@ public abstract class Instantiator { } /** - * sets the context of this <code>Instantiator</code>. For internal use only. + * sets the context of this {@code Instantiator}. For internal use only. */ public void setContext(Object/* ClientProxyMembershipID */ context) { this.context = (ClientProxyMembershipID) context; } /** - * Returns the context of this <code>Instantiator</code>. For internal use only. + * Returns the context of this {@code Instantiator}. For internal use only. */ public Object/* ClientProxyMembershipID */ getContext() { return this.context; http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java index a0cb7d4..7af4f4f 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java @@ -228,7 +228,7 @@ public abstract class AbstractOp implements Op { protected abstract Object processResponse(Message msg) throws Exception; /** - * Return true of <code>msgType</code> indicates the operation had an error on the server. + * Return true of <code>messageType</code> indicates the operation had an error on the server. */ protected abstract boolean isErrorResponse(int msgType); http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java index cc30f1c..2e52542 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java @@ -14,7 +14,6 @@ */ package org.apache.geode.cache.client.internal; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.Message; @@ -25,6 +24,7 @@ import org.apache.geode.internal.cache.tier.sockets.Message; * @since GemFire 5.7 */ public class PingOp { + /** * Ping the specified server to see if it is still alive * @@ -47,13 +47,13 @@ public class PingOp { /** * @throws org.apache.geode.SerializationException if serialization fails */ - public PingOpImpl() { + PingOpImpl() { super(MessageType.PING, 0); } @Override protected void processSecureBytes(Connection cnx, Message message) throws Exception { - Message.messageType.set(null); + Message.MESSAGE_TYPE.set(null); } @Override @@ -64,9 +64,9 @@ public class PingOp { @Override protected void sendMessage(Connection cnx) throws Exception { getMessage().clearMessageHasSecurePartFlag(); - startTime = System.currentTimeMillis(); + this.startTime = System.currentTimeMillis(); getMessage().send(false); - Message.messageType.set(MessageType.PING); + Message.MESSAGE_TYPE.set(MessageType.PING); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java index 291db65..7698550 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java @@ -572,20 +572,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * the server. */ private Message initializeMessage() { - Message _message = new Message(2, Version.CURRENT); - try { - _message.setComms(socket, in, out, commBuffer, this.stats); - } catch (IOException e) { - if (!quitting()) { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Caught following exception while attempting to initialize a server-to-client communication socket and will exit", - this, e); - } - stopProcessing(); - } - } - return _message; + Message message = new Message(2, Version.CURRENT); + message.setComms(this.socket, this.in, this.out, this.commBuffer, this.stats); + return message; } /* refinement of method inherited from Thread */ http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java index 2a5a3d7..be30061 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java @@ -22,7 +22,6 @@ import org.apache.geode.internal.logging.LogService; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.apache.logging.log4j.Logger; @@ -36,7 +35,7 @@ import org.apache.logging.log4j.Logger; * * <PRE> * - * msgType - int - 4 bytes type of message, types enumerated below + * messageType - int - 4 bytes type of message, types enumerated below * * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained * in the payload. Message can be a multi-part message @@ -153,7 +152,7 @@ public class ChunkedMessage extends Message { public void setLastChunkAndNumParts(boolean lastChunk, int numParts) { setLastChunk(lastChunk); - if (this.sc != null && this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) { + if (this.serverConnection != null && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) { // we us e three bits for number of parts in last chunk byte // we us e three bits for number of parts in last chunk byte byte localLastChunk = (byte) (numParts << 5); @@ -162,7 +161,7 @@ public class ChunkedMessage extends Message { } public void setServerConnection(ServerConnection servConn) { - if (this.sc != servConn) + if (this.serverConnection != servConn) throw new IllegalStateException("this.sc was not correctly set"); } @@ -209,7 +208,7 @@ public class ChunkedMessage extends Message { // Set the header and payload fields only after receiving all the // socket data, providing better message consistency in the face // of exceptional conditions (e.g. IO problems, timeouts etc.) - this.msgType = type; + this.messageType = type; this.numberOfParts = numParts; // Already set in setPayloadFields via setNumberOfParts this.transactionId = txid; } @@ -241,14 +240,14 @@ public class ChunkedMessage extends Message { int totalBytesRead = 0; do { int bytesRead = 0; - bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead); + bytesRead = inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString()); } totalBytesRead += bytesRead; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(bytesRead); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(bytesRead); } } while (totalBytesRead < CHUNK_HEADER_LENGTH); @@ -315,7 +314,7 @@ public class ChunkedMessage extends Message { * Sends a chunk of this message. */ public void sendChunk(ServerConnection servConn) throws IOException { - if (this.sc != servConn) + if (this.serverConnection != servConn) throw new IllegalStateException("this.sc was not correctly set"); sendChunk(); } @@ -355,7 +354,7 @@ public class ChunkedMessage extends Message { protected void getHeaderBytesForWrite() { final ByteBuffer cb = getCommBuffer(); cb.clear(); - cb.putInt(this.msgType); + cb.putInt(this.messageType); cb.putInt(this.numberOfParts); cb.putInt(this.transactionId); http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java index f102b2d..354ad0f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets; +import static org.apache.geode.internal.util.IOUtils.close; + import org.apache.geode.SerializationException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.Assert; @@ -34,7 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Map; @@ -47,7 +48,7 @@ import java.util.concurrent.TimeUnit; * and serialize it out to the wire. * * <PRE> - * msgType - int - 4 bytes type of message, types enumerated below + * messageType - int - 4 bytes type of message, types enumerated below * * msgLength - int - 4 bytes total length of variable length payload * @@ -55,10 +56,10 @@ import java.util.concurrent.TimeUnit; * contained in the payload. Message can * be a multi-part message * - * transId - int - 4 bytes filled in by the requestor, copied back into + * transId - int - 4 bytes filled in by the requester, copied back into * the response * - * flags - byte- 1 byte filled in by the requestor + * flags - byte- 1 byte filled in by the requester * len1 * part1 * . @@ -76,18 +77,16 @@ import java.util.concurrent.TimeUnit; * * See also <a href="package-summary.html#messages">package description</a>. * - * @see org.apache.geode.internal.cache.tier.MessageType - * + * @see MessageType */ public class Message { - public static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; - /** - * maximum size of an outgoing message. See GEODE-478 - */ - public static int MAX_MESSAGE_SIZE = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size", - DEFAULT_MAX_MESSAGE_SIZE).intValue(); + // Tentative workaround to avoid OOM stated in #46754. + public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>(); + + public static final String MAX_MESSAGE_SIZE_PROPERTY = DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size"; + + static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824; private static final Logger logger = LogService.getLogger(); @@ -97,83 +96,95 @@ public class Message { private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>(); - private static final byte[] TRUE; - private static final byte[] FALSE; + // These two statics are fields shoved into the flags byte for transmission. + // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other + // is left in place + private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02; + private static final byte MESSAGE_IS_RETRY = (byte) 0x04; - static { + private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB; + + private static final int DEFAULT_CHUNK_SIZE = 1024; + + private static final byte[] TRUE = defineTrue(); + private static final byte[] FALSE = defineFalse(); + + private static byte[] defineTrue() { + HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); try { - HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); BlobHelper.serializeTo(Boolean.TRUE, hdos); - TRUE = hdos.toByteArray(); - } catch (Exception e) { + return hdos.toByteArray(); + } catch (IOException e) { throw new IllegalStateException(e); + } finally { + close(hdos); } + } + private static byte[] defineFalse() { + HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); try { - HeapDataOutputStream hdos = new HeapDataOutputStream(10, null); BlobHelper.serializeTo(Boolean.FALSE, hdos); - FALSE = hdos.toByteArray(); - } catch (Exception e) { + return hdos.toByteArray(); + } catch (IOException e) { throw new IllegalStateException(e); + } finally { + close(hdos); } } - protected int msgType; - protected int payloadLength = 0; - protected int numberOfParts = 0; + /** + * maximum size of an outgoing message. See GEODE-478 + */ + private final int maxMessageSize; + + protected int messageType; + private int payloadLength = 0; + int numberOfParts = 0; protected int transactionId = TXManagerImpl.NOTX; - protected int currentPart = 0; - protected Part[] partsList = null; - protected ByteBuffer cachedCommBuffer; + int currentPart = 0; + private Part[] partsList = null; + private ByteBuffer cachedCommBuffer; protected Socket socket = null; - protected SocketChannel sockCh = null; - protected OutputStream os = null; - protected InputStream is = null; - protected boolean messageModified = true; + private SocketChannel socketChannel = null; + private OutputStream outputStream = null; + protected InputStream inputStream = null; + private boolean messageModified = true; + /** is this message a retry of a previously sent message? */ - protected boolean isRetry; + private boolean isRetry; + private byte flags = 0x00; - protected MessageStats msgStats = null; - protected ServerConnection sc = null; + MessageStats messageStats = null; + protected ServerConnection serverConnection = null; private int maxIncomingMessageLength = -1; private Semaphore dataLimiter = null; - // private int MAX_MSGS = -1; - private Semaphore msgLimiter = null; - private boolean hdrRead = false; - private int chunkSize = 1024;// Default Chunk Size. + private Semaphore messageLimiter = null; + private boolean readHeader = false; + private int chunkSize = DEFAULT_CHUNK_SIZE; - protected Part securePart = null; + Part securePart = null; private boolean isMetaRegion = false; - - // These two statics are fields shoved into the flags byte for transmission. - // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other - // is left in place - public static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02; - public static final byte MESSAGE_IS_RETRY = (byte) 0x04; - - public static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB; - - // Tentative workaround to avoid OOM stated in #46754. - public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>(); - - Version version; + private Version version; /** * Creates a new message with the given number of parts */ public Message(int numberOfParts, Version destVersion) { + this.maxMessageSize = Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, DEFAULT_MAX_MESSAGE_SIZE); this.version = destVersion; Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message"); - partsList = new Part[numberOfParts]; + this.partsList = new Part[numberOfParts]; this.numberOfParts = numberOfParts; - for (int i = 0; i < partsList.length; i++) { - partsList[i] = new Part(); + int partsListLength = this.partsList.length; + for (int i = 0; i < partsListLength; i++) { + this.partsList[i] = new Part(); } } public boolean isSecureMode() { - return securePart != null; + return this.securePart != null; } public byte[] getSecureBytes() throws IOException, ClassNotFoundException { @@ -186,7 +197,7 @@ public class Message { throw new IllegalArgumentException( LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString()); } - this.msgType = msgType; + this.messageType = msgType; } public void setVersion(Version clientVersion) { @@ -194,17 +205,15 @@ public class Message { } public void setMessageHasSecurePartFlag() { - this.flags = (byte) (this.flags | MESSAGE_HAS_SECURE_PART); + this.flags |= MESSAGE_HAS_SECURE_PART; } public void clearMessageHasSecurePartFlag() { - this.flags = (byte) (this.flags & MESSAGE_HAS_SECURE_PART); + this.flags &= MESSAGE_HAS_SECURE_PART; } /** * Sets and builds the {@link Part}s that are sent in the payload of the Message - * - * @param numberOfParts */ public void setNumberOfParts(int numberOfParts) { // hitesh: need to add security header here from server @@ -227,9 +236,7 @@ public class Message { } /** - * For boundary testing we may need to inject mock parts - * - * @param parts + * For boundary testing we may need to inject mock parts. For testing only. */ void setParts(Part[] parts) { this.partsList = parts; @@ -260,7 +267,7 @@ public class Message { /** * When building a Message this will return the number of the next Part to be added to the message */ - public int getNextPartNumber() { + int getNextPartNumber() { return this.currentPart; } @@ -268,32 +275,41 @@ public class Message { addStringPart(str, false); } - private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<String, byte[]>(); + private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<>(); public void addStringPart(String str, boolean enableCaching) { if (str == null) { - addRawPart((byte[]) null, false); - } else { - Part part = partsList[this.currentPart]; - if (enableCaching) { - byte[] bytes = CACHED_STRINGS.get(str); - if (bytes == null) { - HeapDataOutputStream hdos = new HeapDataOutputStream(str); + addRawPart(null, false); + return; + } + + Part part = this.partsList[this.currentPart]; + if (enableCaching) { + byte[] bytes = CACHED_STRINGS.get(str); + if (bytes == null) { + HeapDataOutputStream hdos = new HeapDataOutputStream(str); + try { bytes = hdos.toByteArray(); CACHED_STRINGS.put(str, bytes); + } finally { + close(hdos); } - part.setPartState(bytes, false); - } else { - HeapDataOutputStream hdos = new HeapDataOutputStream(str); - this.messageModified = true; - part.setPartState(hdos, false); } - this.currentPart++; + part.setPartState(bytes, false); + } else { + HeapDataOutputStream hdos = new HeapDataOutputStream(str); + try { + this.messageModified = true; + part.setPartState(hdos, false); + } finally { + close(hdos); + } } + this.currentPart++; } /* - * Adds a new part to this message that contains a <code>byte</code> array (as opposed to a + * Adds a new part to this message that contains a {@code byte} array (as opposed to a * serialized object). * * @see #addPart(byte[], boolean) @@ -312,13 +328,6 @@ public class Message { } } - public void addDeltaPart(HeapDataOutputStream hdos) { - this.messageModified = true; - Part part = partsList[this.currentPart]; - part.setPartState(hdos, false); - this.currentPart++; - } - public void addObjPart(Object o) { addObjPart(o, false); } @@ -345,6 +354,9 @@ public class Message { } } + /** + * Object o is always null + */ public void addPartInAnyForm(@Unretained Object o, boolean isObject) { if (o == null) { addRawPart((byte[]) o, false); @@ -353,7 +365,7 @@ public class Message { } else if (o instanceof StoredObject) { // It is possible it is an off-heap StoredObject that contains a simple non-object byte[]. this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState((StoredObject) o, isObject); this.currentPart++; } else { @@ -362,59 +374,61 @@ public class Message { } private void serializeAndAddPartNoCopying(Object o) { - HeapDataOutputStream hdos; - Version v = version; - if (version.equals(Version.CURRENT)) { + Version v = this.version; + if (this.version.equals(Version.CURRENT)) { v = null; } + // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources // passed to it. - hdos = new HeapDataOutputStream(chunkSize, v, true); + HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true); try { BlobHelper.serializeTo(o, hdos); + this.messageModified = true; + Part part = this.partsList[this.currentPart]; + part.setPartState(hdos, true); + this.currentPart++; } catch (IOException ex) { throw new SerializationException("failed serializing object", ex); + } finally { + close(hdos); } - this.messageModified = true; - Part part = partsList[this.currentPart]; - part.setPartState(hdos, true); - this.currentPart++; - } private void serializeAndAddPart(Object o, boolean zipValues) { if (zipValues) { throw new UnsupportedOperationException("zipValues no longer supported"); - - } else { - HeapDataOutputStream hdos; - Version v = version; - if (version.equals(Version.CURRENT)) { - v = null; - } - hdos = new HeapDataOutputStream(chunkSize, v); - try { - BlobHelper.serializeTo(o, hdos); - } catch (IOException ex) { - throw new SerializationException("failed serializing object", ex); - } + } + + Version v = this.version; + if (this.version.equals(Version.CURRENT)) { + v = null; + } + + HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v); + try { + BlobHelper.serializeTo(o, hdos); this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState(hdos, true); this.currentPart++; + } catch (IOException ex) { + throw new SerializationException("failed serializing object", ex); + } finally { + close(hdos); } } public void addIntPart(int v) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setInt(v); this.currentPart++; } public void addLongPart(long v) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setLong(v); this.currentPart++; } @@ -424,13 +438,13 @@ public class Message { */ public void addRawPart(byte[] newPart, boolean isObject) { this.messageModified = true; - Part part = partsList[this.currentPart]; + Part part = this.partsList[this.currentPart]; part.setPartState(newPart, isObject); this.currentPart++; } public int getMessageType() { - return this.msgType; + return this.messageType; } public int getPayloadLength() { @@ -451,7 +465,7 @@ public class Message { public Part getPart(int index) { if (index < this.numberOfParts) { - Part p = partsList[index]; + Part p = this.partsList[index]; if (this.version != null) { p.setVersion(this.version); } @@ -480,9 +494,9 @@ public class Message { if (len != 0) { this.payloadLength = 0; } - if (this.hdrRead) { - if (this.msgStats != null) { - this.msgStats.decMessagesBeingReceived(len); + if (this.readHeader) { + if (this.messageStats != null) { + this.messageStats.decMessagesBeingReceived(len); } } ByteBuffer buffer = getCommBuffer(); @@ -495,20 +509,18 @@ public class Message { this.dataLimiter = null; this.maxIncomingMessageLength = 0; } - if (this.hdrRead) { - if (this.msgLimiter != null) { - this.msgLimiter.release(1); - this.msgLimiter = null; + if (this.readHeader) { + if (this.messageLimiter != null) { + this.messageLimiter.release(1); + this.messageLimiter = null; } - this.hdrRead = false; + this.readHeader = false; } this.flags = 0; } protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) { - // hitesh: setting second bit of flags byte for client - // this is not require but this makes all changes easily at client side right now - // just see this bit and process security header + // setting second bit of flags byte for client this is not require but this makes all changes easily at client side right now just see this bit and process security header byte flagsByte = this.flags; if (isSecurityHeader) { flagsByte |= MESSAGE_HAS_SECURE_PART; @@ -516,14 +528,14 @@ public class Message { if (this.isRetry) { flagsByte |= MESSAGE_IS_RETRY; } - getCommBuffer().putInt(this.msgType).putInt(msgLen).putInt(this.numberOfParts) - .putInt(this.transactionId).put(flagsByte); + getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts) + .putInt(this.transactionId).put(flagsByte); } protected Part getSecurityPart() { - if (this.sc != null) { + if (this.serverConnection != null) { // look types right put get etc - return this.sc.updateAndGetSecurityPart(); + return this.serverConnection.updateAndGetSecurityPart(); } return null; } @@ -537,7 +549,7 @@ public class Message { this.isMetaRegion = isMetaRegion; } - public boolean getAndResetIsMetaRegion() { + boolean getAndResetIsMetaRegion() { boolean isMetaRegion = this.isMetaRegion; this.isMetaRegion = false; return isMetaRegion; @@ -546,21 +558,20 @@ public class Message { /** * Sends this message out on its socket. */ - protected void sendBytes(boolean clearMessage) throws IOException { - if (this.sc != null) { + void sendBytes(boolean clearMessage) throws IOException { + if (this.serverConnection != null) { // Keep track of the fact that we are making progress. - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } if (this.socket == null) { throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); } try { - final ByteBuffer cb = getCommBuffer(); - if (cb == null) { + final ByteBuffer commBuffer = getCommBuffer(); + if (commBuffer == null) { throw new IOException("No buffer"); } - int msgLen = 0; - synchronized (cb) { + synchronized (commBuffer) { long totalPartLen = 0; long headerLen = 0; int partsToTransmit = this.numberOfParts; @@ -581,50 +592,50 @@ public class Message { partsToTransmit++; } - if ((headerLen + totalPartLen) > Integer.MAX_VALUE) { + if (headerLen + totalPartLen > Integer.MAX_VALUE) { throw new MessageTooLargeException( "Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value"); } - msgLen = (int) (headerLen + totalPartLen); + int msgLen = (int) (headerLen + totalPartLen); - if (msgLen > MAX_MESSAGE_SIZE) { + if (msgLen > this.maxMessageSize) { throw new MessageTooLargeException("Message size (" + msgLen - + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")"); + + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")"); } - cb.clear(); - packHeaderInfoForSending(msgLen, (securityPart != null)); + commBuffer.clear(); + packHeaderInfoForSending(msgLen, securityPart != null); for (int i = 0; i < partsToTransmit; i++) { - Part part = (i == this.numberOfParts) ? securityPart : partsList[i]; + Part part = i == this.numberOfParts ? securityPart : this.partsList[i]; - if (cb.remaining() < PART_HEADER_SIZE) { + if (commBuffer.remaining() < PART_HEADER_SIZE) { flushBuffer(); } int partLen = part.getLength(); - cb.putInt(partLen); - cb.put(part.getTypeCode()); - if (partLen <= cb.remaining()) { - part.writeTo(cb); + commBuffer.putInt(partLen); + commBuffer.put(part.getTypeCode()); + if (partLen <= commBuffer.remaining()) { + part.writeTo(commBuffer); } else { flushBuffer(); - if (this.sockCh != null) { - part.writeTo(this.sockCh, cb); + if (this.socketChannel != null) { + part.writeTo(this.socketChannel, commBuffer); } else { - part.writeTo(this.os, cb); + part.writeTo(this.outputStream, commBuffer); } - if (this.msgStats != null) { - this.msgStats.incSentBytes(partLen); + if (this.messageStats != null) { + this.messageStats.incSentBytes(partLen); } } } - if (cb.position() != 0) { + if (commBuffer.position() != 0) { flushBuffer(); } this.messageModified = false; - if (this.sockCh == null) { - this.os.flush(); + if (this.socketChannel == null) { + this.outputStream.flush(); } } } finally { @@ -634,69 +645,67 @@ public class Message { } } - protected void flushBuffer() throws IOException { + void flushBuffer() throws IOException { final ByteBuffer cb = getCommBuffer(); - if (this.sockCh != null) { + if (this.socketChannel != null) { cb.flip(); do { - this.sockCh.write(cb); + this.socketChannel.write(cb); } while (cb.remaining() > 0); } else { - this.os.write(cb.array(), 0, cb.position()); + this.outputStream.write(cb.array(), 0, cb.position()); } - if (this.msgStats != null) { - this.msgStats.incSentBytes(cb.position()); + if (this.messageStats != null) { + this.messageStats.incSentBytes(cb.position()); } cb.clear(); } private void read() throws IOException { clearParts(); - // TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client - // :( + // TODO: for server changes make sure sc is not null as this class also used by client readHeaderAndPayload(); } /** * Read the actual bytes of the header off the socket */ - protected void fetchHeader() throws IOException { + void fetchHeader() throws IOException { final ByteBuffer cb = getCommBuffer(); cb.clear(); - // msgType is invalidated here and can be used as an indicator + + // messageType is invalidated here and can be used as an indicator // of problems reading the message - this.msgType = MessageType.INVALID; - - int hdr = 0; + this.messageType = MessageType.INVALID; final int headerLength = getHeaderLength(); - if (this.sockCh != null) { + if (this.socketChannel != null) { cb.limit(headerLength); do { - int bytesRead = this.sockCh.read(cb); - // System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb); + int bytesRead = this.socketChannel.read(cb); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER .toLocalizedString()); } - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(bytesRead); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(bytesRead); } } while (cb.remaining() > 0); cb.flip(); + } else { + int hdr = 0; do { - int bytesRead = -1; - bytesRead = this.is.read(cb.array(), hdr, headerLength - hdr); + int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - hdr); if (bytesRead == -1) { throw new EOFException( LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER .toLocalizedString()); } hdr += bytesRead; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(bytesRead); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(bytesRead); } } while (hdr < headerLength); @@ -717,34 +726,36 @@ public class Message { if (!MessageType.validate(type)) { throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER - .toLocalizedString(Integer.valueOf(type))); + .toLocalizedString(type)); } + int timeToWait = 0; - if (this.sc != null) { + if (this.serverConnection != null) { // Keep track of the fact that a message is being processed. - this.sc.setProcessingMessage(); - timeToWait = sc.getClientReadTimeout(); + this.serverConnection.setProcessingMessage(); + timeToWait = this.serverConnection.getClientReadTimeout(); } - this.hdrRead = true; - if (this.msgLimiter != null) { + this.readHeader = true; + + if (this.messageLimiter != null) { for (;;) { - this.sc.getCachedRegionHelper().checkCancelInProgress(null); + this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); boolean interrupted = Thread.interrupted(); try { if (timeToWait == 0) { - this.msgLimiter.acquire(1); + this.messageLimiter.acquire(1); } else { - if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { - if (this.msgStats != null && this.msgStats instanceof CacheServerStats) { - ((CacheServerStats) this.msgStats).incConnectionsTimedOut(); + if (!this.messageLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) { + if (this.messageStats instanceof CacheServerStats) { + ((CacheServerStats) this.messageStats).incConnectionsTimedOut(); } throw new IOException( LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS - .toLocalizedString(Integer.valueOf(timeToWait))); + .toLocalizedString(timeToWait)); } } break; - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -753,16 +764,19 @@ public class Message { } } // for } + if (len > 0) { if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) { throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1 - .toLocalizedString(new Object[] {Integer.valueOf(len), - Integer.valueOf(this.maxIncomingMessageLength)})); + .toLocalizedString(new Object[] { + len, this.maxIncomingMessageLength + })); } + if (this.dataLimiter != null) { for (;;) { - if (sc != null) { - this.sc.getCachedRegionHelper().checkCancelInProgress(null); + if (this.serverConnection != null) { + this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null); } boolean interrupted = Thread.interrupted(); try { @@ -770,21 +784,21 @@ public class Message { this.dataLimiter.acquire(len); } else { int newTimeToWait = timeToWait; - if (this.msgLimiter != null) { + if (this.messageLimiter != null) { // may have waited for msg limit so recalc time to wait - newTimeToWait -= (int) sc.getCurrentMessageProcessingTime(); + newTimeToWait -= (int) this.serverConnection.getCurrentMessageProcessingTime(); } if (newTimeToWait <= 0 - || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { + || !this.messageLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) { throw new IOException( LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS .toLocalizedString(timeToWait)); } } - this.payloadLength = len; // makes sure payloadLength gets set now so we will release - // the semaphore + // makes sure payloadLength gets set now so we will release the semaphore + this.payloadLength = len; break; // success - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -794,15 +808,15 @@ public class Message { } } } - if (this.msgStats != null) { - this.msgStats.incMessagesBeingReceived(len); + if (this.messageStats != null) { + this.messageStats.incMessagesBeingReceived(len); this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear } this.isRetry = (bits & MESSAGE_IS_RETRY) != 0; - bits = (byte) (bits & MESSAGE_IS_RETRY_MASK); + bits &= MESSAGE_IS_RETRY_MASK; this.flags = bits; - this.msgType = type; + this.messageType = type; readPayloadFields(numParts, len); @@ -813,32 +827,38 @@ public class Message { // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts this.transactionId = txid; this.flags = bits; - if (this.sc != null) { + if (this.serverConnection != null) { // Keep track of the fact that a message is being processed. - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } } - protected void readPayloadFields(final int numParts, final int len) throws IOException { + /** + * TODO: refactor overly long method readPayloadFields + */ + void readPayloadFields(final int numParts, final int len) throws IOException { if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) { throw new IOException( LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT - .toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)})); + .toLocalizedString(new Object[] { len, numParts })); } - Integer msgType = messageType.get(); + Integer msgType = MESSAGE_TYPE.get(); if (msgType != null && msgType == MessageType.PING) { - messageType.set(null); // set it to null right away. - int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping - // operation. + // set it to null right away. + MESSAGE_TYPE.set(null); + // Some number which will not throw OOM but still be acceptable for a ping operation. + int pingParts = 10; if (numParts > pingParts) { throw new IOException("Part length ( " + numParts + " ) is inconsistent for " + MessageType.getString(msgType) + " operation."); } } + setNumberOfParts(numParts); - if (numParts <= 0) + if (numParts <= 0) { return; + } if (len < 0) { logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len)); @@ -849,12 +869,10 @@ public class Message { cb.clear(); cb.flip(); - int readSecurePart = 0; - readSecurePart = checkAndSetSecurityPart(); + int readSecurePart = checkAndSetSecurityPart(); int bytesRemaining = len; - for (int i = 0; ((i < numParts + readSecurePart) - || ((readSecurePart == 1) && (cb.remaining() > 0))); i++) { + for (int i = 0; i < numParts + readSecurePart || readSecurePart == 1 && cb.remaining() > 0; i++) { int bytesReadThisTime = readPartChunk(bytesRemaining); bytesRemaining -= bytesReadThisTime; @@ -869,6 +887,7 @@ public class Message { int partLen = cb.getInt(); byte partType = cb.get(); byte[] partBytes = null; + if (partLen > 0) { partBytes = new byte[partLen]; int alreadyReadBytes = cb.remaining(); @@ -878,26 +897,27 @@ public class Message { } cb.get(partBytes, 0, alreadyReadBytes); } + // now we need to read partLen - alreadyReadBytes off the wire int off = alreadyReadBytes; int remaining = partLen - off; while (remaining > 0) { - if (this.sockCh != null) { + if (this.socketChannel != null) { int bytesThisTime = remaining; cb.clear(); if (bytesThisTime > cb.capacity()) { bytesThisTime = cb.capacity(); } cb.limit(bytesThisTime); - int res = this.sockCh.read(cb); + int res = this.socketChannel.read(cb); if (res != -1) { cb.flip(); bytesRemaining -= res; remaining -= res; cb.get(partBytes, off, res); off += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -905,14 +925,13 @@ public class Message { .toLocalizedString()); } } else { - int res = 0; - res = this.is.read(partBytes, off, remaining); + int res = this.inputStream.read(partBytes, off, remaining); if (res != -1) { bytesRemaining -= res; remaining -= res; off += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -941,35 +960,38 @@ public class Message { * @return the number of bytes read into commBuffer */ private int readPartChunk(int bytesRemaining) throws IOException { - final ByteBuffer cb = getCommBuffer(); - if (cb.remaining() >= PART_HEADER_SIZE) { + final ByteBuffer commBuffer = getCommBuffer(); + if (commBuffer.remaining() >= PART_HEADER_SIZE) { // we already have the next part header in commBuffer so just return return 0; } - if (cb.position() != 0) { - cb.compact(); + + if (commBuffer.position() != 0) { + commBuffer.compact(); } else { - cb.position(cb.limit()); - cb.limit(cb.capacity()); + commBuffer.position(commBuffer.limit()); + commBuffer.limit(commBuffer.capacity()); } - int bytesRead = 0; - if (this.sc != null) { + + if (this.serverConnection != null) { // Keep track of the fact that we are making progress - this.sc.updateProcessingMessage(); + this.serverConnection.updateProcessingMessage(); } - if (this.sockCh != null) { - int remaining = cb.remaining(); + int bytesRead = 0; + + if (this.socketChannel != null) { + int remaining = commBuffer.remaining(); if (remaining > bytesRemaining) { remaining = bytesRemaining; - cb.limit(cb.position() + bytesRemaining); + commBuffer.limit(commBuffer.position() + bytesRemaining); } while (remaining > 0) { - int res = this.sockCh.read(cb); + int res = this.socketChannel.read(commBuffer); if (res != -1) { remaining -= res; bytesRead += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -979,21 +1001,20 @@ public class Message { } } else { - int bufSpace = cb.capacity() - cb.position(); - int bytesToRead = bufSpace; + int bytesToRead = commBuffer.capacity() - commBuffer.position(); if (bytesRemaining < bytesToRead) { bytesToRead = bytesRemaining; } - int pos = cb.position(); + int pos = commBuffer.position(); + while (bytesToRead > 0) { - int res = 0; - res = this.is.read(cb.array(), pos, bytesToRead); + int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead); if (res != -1) { bytesToRead -= res; pos += res; bytesRead += res; - if (this.msgStats != null) { - this.msgStats.incReceivedBytes(res); + if (this.messageStats != null) { + this.messageStats.incReceivedBytes(res); } } else { throw new EOFException( @@ -1001,9 +1022,10 @@ public class Message { .toLocalizedString()); } } - cb.position(pos); + + commBuffer.position(pos); } - cb.flip(); + commBuffer.flip(); return bytesRead; } @@ -1011,40 +1033,39 @@ public class Message { * Gets rid of all the parts that have been added to this message. */ public void clearParts() { - for (int i = 0; i < partsList.length; i++) { - partsList[i].clear(); + for (Part part : this.partsList) { + part.clear(); } this.currentPart = 0; } @Override public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("type=").append(MessageType.getString(msgType)); - sb.append("; payloadLength=").append(payloadLength); - sb.append("; numberOfParts=").append(numberOfParts); - sb.append("; transactionId=").append(transactionId); - sb.append("; currentPart=").append(currentPart); - sb.append("; messageModified=").append(messageModified); - sb.append("; flags=").append(Integer.toHexString(flags)); - for (int i = 0; i < numberOfParts; i++) { + StringBuilder sb = new StringBuilder(); + sb.append("type=").append(MessageType.getString(this.messageType)); + sb.append("; payloadLength=").append(this.payloadLength); + sb.append("; numberOfParts=").append(this.numberOfParts); + sb.append("; transactionId=").append(this.transactionId); + sb.append("; currentPart=").append(this.currentPart); + sb.append("; messageModified=").append(this.messageModified); + sb.append("; flags=").append(Integer.toHexString(this.flags)); + for (int i = 0; i < this.numberOfParts; i++) { sb.append("; part[").append(i).append("]={"); - sb.append(this.partsList[i].toString()); + sb.append(this.partsList[i]); sb.append("}"); } return sb.toString(); } - - public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) + void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { - this.sc = sc; + this.serverConnection = sc; setComms(socket, bb, msgStats); } - public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { - this.sockCh = socket.getChannel(); - if (this.sockCh == null) { + void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException { + this.socketChannel = socket.getChannel(); + if (this.socketChannel == null) { setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats); } else { setComms(socket, null, null, bb, msgStats); @@ -1052,14 +1073,14 @@ public class Message { } public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, - MessageStats msgStats) throws IOException { + MessageStats msgStats) { Assert.assertTrue(socket != null); this.socket = socket; - this.sockCh = socket.getChannel(); - this.is = is; - this.os = os; + this.socketChannel = socket.getChannel(); + this.inputStream = is; + this.outputStream = os; this.cachedCommBuffer = bb; - this.msgStats = msgStats; + this.messageStats = msgStats; } /** @@ -1069,11 +1090,11 @@ public class Message { */ public void unsetComms() { this.socket = null; - this.sockCh = null; - this.is = null; - this.os = null; + this.socketChannel = null; + this.inputStream = null; + this.outputStream = null; this.cachedCommBuffer = null; - this.msgStats = null; + this.messageStats = null; } /** @@ -1084,7 +1105,7 @@ public class Message { } public void send(ServerConnection servConn) throws IOException { - if (this.sc != servConn) + if (this.serverConnection != servConn) throw new IllegalStateException("this.sc was not correctly set"); send(true); } @@ -1097,7 +1118,7 @@ public class Message { } /** - * Populates the stats of this <code>Message</code> with information received via its socket + * Populates the stats of this {@code Message} with information received via its socket */ public void recv() throws IOException { if (this.socket != null) { @@ -1111,10 +1132,10 @@ public class Message { public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, Semaphore msgLimiter) throws IOException { - this.sc = sc; + this.serverConnection = sc; this.maxIncomingMessageLength = maxMessageLength; this.dataLimiter = dataLimiter; - this.msgLimiter = msgLimiter; + this.messageLimiter = msgLimiter; recv(); } http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 83d0e9d..dfda14f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -723,12 +723,7 @@ public class ServerConnection implements Runnable { ThreadState threadState = null; try { if (msg != null) { - // this.logger.fine("donormalMsg() msgType " + msg.getMessageType()); - // Since this thread is not interrupted when the cache server is - // shutdown, - // test again after a message has been read. This is a bit of a hack. I - // think this thread should be interrupted, but currently AcceptorImpl - // doesn't keep track of the threads that it launches. + // Since this thread is not interrupted when the cache server is shutdown, test again after a message has been read. This is a bit of a hack. I think this thread should be interrupted, but currently AcceptorImpl doesn't keep track of the threads that it launches. if (!this.processMessages || (crHelper.isShutdown())) { if (logger.isDebugEnabled()) { logger.debug("{} ignoring message of type {} from client {} due to shutdown.", @@ -1078,8 +1073,6 @@ public class ServerConnection implements Runnable { */ public Part updateAndGetSecurityPart() { // need to take care all message types here - // this.logger.fine("getSecurityPart() msgType = " - // + this.requestMsg.msgType); if (AcceptorImpl.isAuthenticationRequired() && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0 && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY) @@ -1090,40 +1083,40 @@ public class ServerConnection implements Runnable { if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) { logger.debug( "ServerConnection.updateAndGetSecurityPart() not adding security part for msg type {}", - MessageType.getString(this.requestMsg.msgType)); + MessageType.getString(this.requestMsg.messageType)); } } return null; } private boolean isInternalMessage() { - return (this.requestMsg.msgType == MessageType.CLIENT_READY - || this.requestMsg.msgType == MessageType.CLOSE_CONNECTION - || this.requestMsg.msgType == MessageType.GETCQSTATS_MSG_TYPE - || this.requestMsg.msgType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES - || this.requestMsg.msgType == MessageType.GET_CLIENT_PR_METADATA - || this.requestMsg.msgType == MessageType.INVALID - || this.requestMsg.msgType == MessageType.MAKE_PRIMARY - || this.requestMsg.msgType == MessageType.MONITORCQ_MSG_TYPE - || this.requestMsg.msgType == MessageType.PERIODIC_ACK - || this.requestMsg.msgType == MessageType.PING - || this.requestMsg.msgType == MessageType.REGISTER_DATASERIALIZERS - || this.requestMsg.msgType == MessageType.REGISTER_INSTANTIATORS - || this.requestMsg.msgType == MessageType.REQUEST_EVENT_VALUE - || this.requestMsg.msgType == MessageType.ADD_PDX_TYPE - || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_TYPE - || this.requestMsg.msgType == MessageType.GET_PDX_TYPE_BY_ID - || this.requestMsg.msgType == MessageType.SIZE - || this.requestMsg.msgType == MessageType.TX_FAILOVER - || this.requestMsg.msgType == MessageType.TX_SYNCHRONIZATION - || this.requestMsg.msgType == MessageType.GET_FUNCTION_ATTRIBUTES - || this.requestMsg.msgType == MessageType.ADD_PDX_ENUM - || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_ENUM - || this.requestMsg.msgType == MessageType.GET_PDX_ENUM_BY_ID - || this.requestMsg.msgType == MessageType.GET_PDX_TYPES - || this.requestMsg.msgType == MessageType.GET_PDX_ENUMS - || this.requestMsg.msgType == MessageType.COMMIT - || this.requestMsg.msgType == MessageType.ROLLBACK); + return (this.requestMsg.messageType == MessageType.CLIENT_READY + || this.requestMsg.messageType == MessageType.CLOSE_CONNECTION + || this.requestMsg.messageType == MessageType.GETCQSTATS_MSG_TYPE + || this.requestMsg.messageType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES + || this.requestMsg.messageType == MessageType.GET_CLIENT_PR_METADATA + || this.requestMsg.messageType == MessageType.INVALID + || this.requestMsg.messageType == MessageType.MAKE_PRIMARY + || this.requestMsg.messageType == MessageType.MONITORCQ_MSG_TYPE + || this.requestMsg.messageType == MessageType.PERIODIC_ACK + || this.requestMsg.messageType == MessageType.PING + || this.requestMsg.messageType == MessageType.REGISTER_DATASERIALIZERS + || this.requestMsg.messageType == MessageType.REGISTER_INSTANTIATORS + || this.requestMsg.messageType == MessageType.REQUEST_EVENT_VALUE + || this.requestMsg.messageType == MessageType.ADD_PDX_TYPE + || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_TYPE + || this.requestMsg.messageType == MessageType.GET_PDX_TYPE_BY_ID + || this.requestMsg.messageType == MessageType.SIZE + || this.requestMsg.messageType == MessageType.TX_FAILOVER + || this.requestMsg.messageType == MessageType.TX_SYNCHRONIZATION + || this.requestMsg.messageType == MessageType.GET_FUNCTION_ATTRIBUTES + || this.requestMsg.messageType == MessageType.ADD_PDX_ENUM + || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_ENUM + || this.requestMsg.messageType == MessageType.GET_PDX_ENUM_BY_ID + || this.requestMsg.messageType == MessageType.GET_PDX_TYPES + || this.requestMsg.messageType == MessageType.GET_PDX_ENUMS + || this.requestMsg.messageType == MessageType.COMMIT + || this.requestMsg.messageType == MessageType.ROLLBACK); } public void run() { http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 4e450c7..1afe6ff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -2149,7 +2149,7 @@ public class Connection implements Runnable { logger.fatal(LocalizedMessage .create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex); } - } else /* (msgType == END_CHUNKED_MSG_TYPE) */ { + } else /* (messageType == END_CHUNKED_MSG_TYPE) */ { MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); try { http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java index 031f827..80b16fc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java +++ b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java @@ -30,8 +30,7 @@ import java.io.ObjectStreamClass; /** * Reusable Input/Output operation utility methods. - * <p/> - * + * * @since GemFire 6.6 */ @SuppressWarnings("unused") @@ -44,8 +43,7 @@ public abstract class IOUtils { * File.separator character. If the pathname is unspecified (null, empty or blank) then path * elements are considered relative to file system root, beginning with File.separator. If array * of path elements are null, then the pathname is returned as is. - * </p> - * + * * @param pathname a String value indicating the base pathname. * @param pathElements the path elements to append to pathname. * @return the path elements appended to the pathname. http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java index 86fcbce..b2d903c 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java @@ -32,53 +32,49 @@ import org.apache.geode.test.junit.categories.UnitTest; public class MessageJUnitTest { private Message message; - private Socket mockSocket; - private MessageStats mockStats; - private ByteBuffer msgBuffer; - private ServerConnection mockServerConnection; @Before public void setUp() throws Exception { - mockSocket = mock(Socket.class); - message = new Message(2, Version.CURRENT); - assertEquals(2, message.getNumberOfParts()); - mockStats = mock(MessageStats.class); - msgBuffer = ByteBuffer.allocate(1000); - mockServerConnection = mock(ServerConnection.class); - message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats); + Socket mockSocket = mock(Socket.class); + this.message = new Message(2, Version.CURRENT); + assertEquals(2, this.message.getNumberOfParts()); + MessageStats mockStats = mock(MessageStats.class); + ByteBuffer msgBuffer = ByteBuffer.allocate(1000); + ServerConnection mockServerConnection = mock(ServerConnection.class); + this.message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats); } @Test public void clearDoesNotThrowNPE() throws Exception { // unsetComms clears the message's ByteBuffer, which was causing an NPE during shutdown // when clear() was invoked - message.unsetComms(); - message.clear(); + this.message.unsetComms(); + this.message.clear(); } @Test public void numberOfPartsIsAdjusted() { - int numParts = message.getNumberOfParts(); - message.setNumberOfParts(2 * numParts + 1); - assertEquals(2 * numParts + 1, message.getNumberOfParts()); - message.addBytesPart(new byte[1]); - message.addIntPart(2); - message.addLongPart(3); - message.addObjPart("4"); - message.addStringPart("5"); - assertEquals(5, message.getNextPartNumber()); + int numParts = this.message.getNumberOfParts(); + this.message.setNumberOfParts(2 * numParts + 1); + assertEquals(2 * numParts + 1, this.message.getNumberOfParts()); + this.message.addBytesPart(new byte[1]); + this.message.addIntPart(2); + this.message.addLongPart(3); + this.message.addObjPart("4"); + this.message.addStringPart("5"); + assertEquals(5, this.message.getNextPartNumber()); } @Test public void messageLongerThanMaxIntIsRejected() throws Exception { - Part[] parts = new Part[2]; Part mockPart1 = mock(Part.class); when(mockPart1.getLength()).thenReturn(Integer.MAX_VALUE / 2); + Part[] parts = new Part[2]; parts[0] = mockPart1; parts[1] = mockPart1; - message.setParts(parts); + this.message.setParts(parts); try { - message.send(); + this.message.send(); fail("expected an exception but none was thrown"); } catch (MessageTooLargeException e) { assertTrue(e.getMessage().contains("exceeds maximum integer value")); @@ -87,14 +83,14 @@ public class MessageJUnitTest { @Test public void maxMessageSizeIsRespected() throws Exception { - Part[] parts = new Part[2]; Part mockPart1 = mock(Part.class); - when(mockPart1.getLength()).thenReturn(Message.MAX_MESSAGE_SIZE / 2); + when(mockPart1.getLength()).thenReturn(Message.DEFAULT_MAX_MESSAGE_SIZE / 2); + Part[] parts = new Part[2]; parts[0] = mockPart1; parts[1] = mockPart1; - message.setParts(parts); + this.message.setParts(parts); try { - message.send(); + this.message.send(); fail("expected an exception but none was thrown"); } catch (MessageTooLargeException e) { assertFalse(e.getMessage().contains("exceeds maximum integer value")); @@ -103,21 +99,17 @@ public class MessageJUnitTest { /** * geode-1468: Message should clear the chunks in its Parts when performing cleanup. - * - * @throws Exception */ @Test public void streamBuffersAreClearedDuringCleanup() throws Exception { - Part[] parts = new Part[2]; Part mockPart1 = mock(Part.class); when(mockPart1.getLength()).thenReturn(100); + Part[] parts = new Part[2]; parts[0] = mockPart1; parts[1] = mockPart1; - message.setParts(parts); - message.clearParts(); + this.message.setParts(parts); + this.message.clearParts(); verify(mockPart1, times(2)).clear(); } - // TODO many more tests are needed - } http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java index 5a679bb..110d649 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java @@ -600,11 +600,11 @@ public abstract class JUnit4DistributedTestCase implements DistributedTestFixtur RegionTestCase.preSnapshotRegion = null; SocketCreator.resetHostNameCache(); SocketCreator.resolve_dns = true; - Message.MAX_MESSAGE_SIZE = Message.DEFAULT_MAX_MESSAGE_SIZE; // clear system properties -- keep alphabetized System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "log-level"); System.clearProperty("jgroups.resolve_dns"); + System.clearProperty(Message.MAX_MESSAGE_SIZE_PROPERTY); if (InternalDistributedSystem.systemAttemptingReconnect != null) { InternalDistributedSystem.systemAttemptingReconnect.stopReconnecting(); http://git-wip-us.apache.org/repos/asf/geode/blob/d785ca31/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index f403447..8cedbf0 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -14,8 +14,10 @@ */ package org.apache.geode.internal.cache.wan.parallel; +import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY; import static org.apache.geode.test.dunit.Assert.*; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -31,8 +33,8 @@ import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.Wait; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; /** * DUnit test for operations on ParallelGatewaySender @@ -40,6 +42,9 @@ import org.apache.geode.test.junit.categories.FlakyTest; @Category(DistributedTest.class) public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties(); + @Override protected final void postSetUpWANTestBase() throws Exception { IgnoredException.addIgnoredException("Broken pipe||Unexpected IOException"); @@ -582,13 +587,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { @Test public void testParallelGatewaySenderMessageTooLargeException() { + vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024))); + Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; // Create and start sender with reduced maximum message size and 1 dispatcher thread String regionName = getTestMethodName() + "_PR"; - vm4.invoke(() -> setMaximumMessageSize(1024 * 1024)); vm4.invoke(() -> createCache(lnPort)); vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1)); vm4.invoke(() -> createSender("ln", 2, true, 100, 100, false, false, null, false)); @@ -617,12 +623,6 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { ignoredGIOE.remove(); } - private void setMaximumMessageSize(int maximumMessageSizeBytes) { - Message.MAX_MESSAGE_SIZE = maximumMessageSizeBytes; - LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: " - + System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size")); - } - private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors, boolean startSenders) { // Note: This is a test-specific method used by several test to create