Repository: incubator-geode Updated Branches: refs/heads/develop 711fc3518 -> 61ad7e445
GEODE-1468 client/server messaging can create large objects After a Message has been sent we invoke clear() on each Part contained by the Message. This was nulling out the "part" variable of the Part objects but if one of these "parts" was a HeapDataOutputStream it might hold a list of large buffers. This change set alters Part to close these streams so that their buffers can be cleared. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/61ad7e44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/61ad7e44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/61ad7e44 Branch: refs/heads/develop Commit: 61ad7e4451aa5504c2f5d92b5d41ce1ffbcec239 Parents: 711fc35 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Fri Jun 3 08:42:00 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Fri Jun 3 08:43:59 2016 -0700 ---------------------------------------------------------------------- .../gemfire/internal/HeapDataOutputStream.java | 5 ++- .../internal/cache/tier/sockets/Message.java | 33 ++++++++++---------- .../internal/cache/tier/sockets/Part.java | 7 ++++- .../cache/tier/sockets/MessageJUnitTest.java | 18 +++++++++++ 4 files changed, 45 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java old mode 100644 new mode 100755 index 4bf39b6..eaad26e --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java @@ -366,7 +366,10 @@ public class HeapDataOutputStream extends OutputStream implements public final void reset() { this.size = 0; - this.chunks = null; + if (this.chunks != null) { + this.chunks.clear(); + this.chunks = null; + } this.buffer.clear(); this.writeMode = true; this.ignoreWrites = false; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java index 139ccde..459cf5f 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java @@ -513,18 +513,21 @@ public class Message { // Keep track of the fact that we are making progress. this.sc.updateProcessingMessage(); } - if (this.socket != null) { + if (this.socket == null) { + throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); + } + try { final ByteBuffer cb = getCommBuffer(); if (cb == null) { throw new IOException("No buffer"); } int msgLen = 0; - synchronized(cb) { + synchronized (cb) { long totalPartLen = 0; long headerLen = 0; int partsToTransmit = this.numberOfParts; - - for (int i=0; i < this.numberOfParts; i++) { + + for (int i = 0; i < this.numberOfParts; i++) { Part part = this.partsList[i]; headerLen += PART_HEADER_SIZE; totalPartLen += part.getLength(); @@ -540,27 +543,27 @@ public class Message { partsToTransmit++; } - if ( (headerLen + totalPartLen) > Integer.MAX_VALUE ) { - throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen) + if ((headerLen + totalPartLen) > Integer.MAX_VALUE) { + throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value"); } - - msgLen = (int)(headerLen + totalPartLen); - + + msgLen = (int) (headerLen + totalPartLen); + if (msgLen > MAX_MESSAGE_SIZE) { throw new MessageTooLargeException("Message size (" + msgLen + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")"); } - + cb.clear(); packHeaderInfoForSending(msgLen, (securityPart != null)); - for (int i=0; i < partsToTransmit; i++) { + for (int i = 0; i < partsToTransmit; i++) { Part part = (i == this.numberOfParts) ? securityPart : partsList[i]; if (cb.remaining() < PART_HEADER_SIZE) { flushBuffer(); } - + int partLen = part.getLength(); cb.putInt(partLen); cb.put(part.getTypeCode()); @@ -586,13 +589,11 @@ public class Message { this.os.flush(); } } - if(clearMessage) { + } finally { + if (clearMessage) { clearParts(); } } - else { - throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString()); - } } protected void flushBuffer() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java index 5e52f96..1c3819e 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java @@ -67,7 +67,12 @@ public class Part { public void clear() { - this.part = null; + if (this.part != null) { + if (this.part instanceof HeapDataOutputStream) { + ((HeapDataOutputStream)this.part).close(); + } + this.part = null; + } this.typeCode = BYTE_CODE; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java index 9f05aa7..24f665f 100755 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java @@ -102,6 +102,24 @@ public class MessageJUnitTest { } } + /** + * geode-1468: Message should clear the chunks in its Parts when + * performing cleanup. + * + * @throws Exception + */ + @Test + public void streamBuffersAreClearedDuringCleanup() throws Exception { + Part[] parts = new Part[2]; + Part mockPart1 = mock(Part.class); + when(mockPart1.getLength()).thenReturn(100); + parts[0] = mockPart1; + parts[1] = mockPart1; + message.setParts(parts); + message.clearParts(); + verify(mockPart1, times(2)).clear(); + } + // TODO many more tests are needed }