ACCUMULO-2671 Refactoring BlockedOutputStream to not recurse. With test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/17344890 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/17344890 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/17344890 Branch: refs/heads/master Commit: 173448903a4b3eb5e755421428e65a4a4dd67de5 Parents: cacb1b6 Author: John Vines <vi...@apache.org> Authored: Thu Apr 17 17:57:32 2014 -0400 Committer: John Vines <vi...@apache.org> Committed: Thu Apr 17 17:57:32 2014 -0400 ---------------------------------------------------------------------- .../security/crypto/BlockedOutputStream.java | 17 ++++---- .../security/crypto/BlockedIOStreamTest.java | 44 +++++++++++++++----- 2 files changed, 44 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/17344890/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java index ca72055..3ce648e 100644 --- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java @@ -72,15 +72,18 @@ public class BlockedOutputStream extends OutputStream { @Override public void write(byte b[], int off, int len) throws IOException { - if (bb.remaining() >= len) { - bb.put(b, off, len); - if (bb.remaining() == 0) - flush(); - } else { + // Can't recurse here in case the len is large and the blocksize is small (and the stack is small) + // So we'll just fill up the buffer over and over + while (len >= bb.remaining()) { int remaining = bb.remaining(); - write(b, off, remaining); - write(b, off + remaining, len - remaining); + bb.put(b, off, remaining); + // This is guaranteed to have the buffer filled, so we'll just flush it. No check needed + flush(); + off += remaining; + len -= remaining; } + // And then write the remainder (and this is guaranteed to not fill the buffer, so we won't flush afteward + bb.put(b, off, len); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/17344890/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java index b344fc3..a116110 100644 --- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java +++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java @@ -70,49 +70,73 @@ public class BlockedIOStreamTest { @Test public void testSmallBufferBlockedIO() throws IOException { - writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12))); + writeRead(16, (12 + 4) * (int) (Math.ceil(25.0 / 12) + Math.ceil(31.0 / 12))); } - + @Test public void testSpillingOverOutputStream() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); // buffer will be size 12 BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16); Random r = new Random(22); - + byte[] undersized = new byte[11]; byte[] perfectSized = new byte[12]; byte[] overSized = new byte[13]; byte[] perfectlyOversized = new byte[13]; byte filler = (byte) r.nextInt(); - + r.nextBytes(undersized); r.nextBytes(perfectSized); r.nextBytes(overSized); r.nextBytes(perfectlyOversized); - + // 1 block blockOut.write(undersized); blockOut.write(filler); blockOut.flush(); - + // 2 blocks blockOut.write(perfectSized); blockOut.write(filler); blockOut.flush(); - + // 2 blocks blockOut.write(overSized); blockOut.write(filler); blockOut.flush(); - + // 3 blocks blockOut.write(undersized); blockOut.write(perfectlyOversized); blockOut.write(filler); blockOut.flush(); - + + blockOut.close(); + assertEquals(16 * 8, baos.toByteArray().length); + } + + @Test + public void testGiantWrite() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int blockSize = 16; + // buffer will be size 12 + BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, blockSize); + Random r = new Random(22); + + int size = 1024 * 1024 * 128; + byte[] giant = new byte[size]; + + r.nextBytes(giant); + + blockOut.write(giant); + blockOut.flush(); + blockOut.close(); - assertEquals(16*8, baos.toByteArray().length); + baos.close(); + + int blocks = (int) Math.ceil(size / (blockSize - 4.0)); + assertEquals(blocks * 16, baos.toByteArray().length); } + }