[FLINK-2869] [tests] Port IOManagerPerformanceBenchmark to JMH. This closes #1270.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3abbcd1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3abbcd1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3abbcd1e Branch: refs/heads/master Commit: 3abbcd1eed42c2f7a65243ec6f27d16dbfc235b5 Parents: b654e98 Author: gallenvara <gallenv...@126.com> Authored: Tue Oct 20 09:49:34 2015 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Oct 29 10:23:22 2015 +0100 ---------------------------------------------------------------------- .../IOManagerPerformanceBenchmark.java | 608 +++++++++++++++++++ .../IOManagerPerformanceBenchmark.java | 415 ------------- 2 files changed, 608 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java new file mode 100644 index 0000000..7d31925 --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.runtime.io.disk.iomanager; + +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import org.apache.flink.runtime.io.disk.iomanager.*; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.types.IntValue; +import org.junit.Assert; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class IOManagerPerformanceBenchmark { + + private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class); + + @Param({"4096", "16384", "524288"}) + private int segmentSizesAligned; + + @Param({"3862", "16895", "500481"}) + private int segmentSizesUnaligned; + + @Param({"1", "2", "4", "6"}) + private int numSegment; + + private static int numBlocks; + + private static final long MEMORY_SIZE = 32 * 1024 * 1024; + + private static final int NUM_INTS_WRITTEN = 100000000; + + private static final AbstractInvokable memoryOwner = new DummyInvokable(); + + private MemoryManager memManager; + + private IOManager ioManager; + + private static FileIOChannel.ID fileIOChannel; + + private static File ioManagerTempFile1; + + private static File ioManagerTempFile2; + + private static File speedTestNIOTempFile1; + + private static File speedTestNIOTempFile2; + + private static File speedTestNIOTempFile3; + + private static File speedTestNIOTempFile4; + + + @Setup + public void startup() throws Exception { + memManager = new MemoryManager(MEMORY_SIZE, 1); + ioManager = new IOManagerAsync(); + testChannelWriteWithSegments(numSegment); + ioManagerTempFile1 = createReadTempFile(segmentSizesAligned); + ioManagerTempFile2 = createReadTempFile(segmentSizesUnaligned); + speedTestNIOTempFile1 = createSpeedTestNIOTempFile(segmentSizesAligned, true); + speedTestNIOTempFile2 = createSpeedTestNIOTempFile(segmentSizesAligned, false); + speedTestNIOTempFile3 = createSpeedTestNIOTempFile(segmentSizesUnaligned, true); + speedTestNIOTempFile4 = createSpeedTestNIOTempFile(segmentSizesUnaligned, false); + } + + @TearDown + public void afterTest() throws Exception { + ioManagerTempFile1.delete(); + ioManagerTempFile2.delete(); + speedTestNIOTempFile1.delete(); + speedTestNIOTempFile2.delete(); + speedTestNIOTempFile3.delete(); + speedTestNIOTempFile4.delete(); + ioManager.shutdown(); + Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); + + Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty()); + memManager.shutdown(); + memManager = null; + } + +// ------------------------------------------------------------------------ + + private File createReadTempFile(int bufferSize) throws IOException { + final FileIOChannel.ID tmpChannel = ioManager.createChannel(); + final IntValue rec = new IntValue(0); + + File tempFile = null; + DataOutputStream daos = null; + + try { + tempFile = new File(tmpChannel.getPath()); + + FileOutputStream fos = new FileOutputStream(tempFile); + daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize)); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + rec.setValue(valsLeft); + rec.write(new OutputViewDataOutputStreamWrapper(daos)); + } + daos.close(); + daos = null; + } + finally { + // close if possible + if (daos != null) { + daos.close(); + } + } + return tempFile; + } + + @SuppressWarnings("resource") + private File createSpeedTestNIOTempFile(int bufferSize, boolean direct) throws IOException + { + final FileIOChannel.ID tmpChannel = ioManager.createChannel(); + + File tempFile = null; + FileChannel fs = null; + + try { + tempFile = new File(tmpChannel.getPath()); + + RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); + fs = raf.getChannel(); + + ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + if (buf.remaining() < 4) { + buf.flip(); + fs.write(buf); + buf.clear(); + } + buf.putInt(valsLeft); + } + + if (buf.position() > 0) { + buf.flip(); + fs.write(buf); + } + + fs.close(); + raf.close(); + fs = null; + } + finally { + // close if possible + if (fs != null) { + fs.close(); + fs = null; + } + } + return tempFile; + } + + @Benchmark + public void speedTestOutputManager() throws Exception + { + LOG.info("Starting speed test with IO Manager..."); + + testChannelWriteWithSegments(numSegment); + } + + @Benchmark + public void speedTestInputManager() throws Exception + { + LOG.info("Starting speed test with IO Manager..."); + + testChannelReadWithSegments(numSegment); + } + + private void testChannelWriteWithSegments(int numSegments) throws Exception + { + final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments); + final FileIOChannel.ID channel = this.ioManager.createChannel(); + + BlockChannelWriter<MemorySegment> writer = null; + + try { + writer = this.ioManager.createBlockChannelWriter(channel); + final ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize()); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + out.writeInt(valsLeft); + } + + fileIOChannel = channel; + out.close(); + numBlocks = out.getBlockCount(); + + writer.close(); + writer = null; + + memManager.release(memory); + } + finally { + if (writer != null) { + writer.closeAndDelete(); + } + } + } + + private void testChannelReadWithSegments(int numSegments) throws Exception + { + final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments); + + BlockChannelReader<MemorySegment> reader = null; + + try { + reader = ioManager.createBlockChannelReader(fileIOChannel); + final ChannelReaderInputView in = new ChannelReaderInputView(reader, memory, numBlocks, false); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + in.readInt(); +// Assert.assertTrue(rec.getValue() == valsLeft); + } + + in.close(); + reader.close(); + reader = null; + + memManager.release(memory); + } + finally { + if (reader != null) { + reader.closeAndDelete(); + } + } + } + +// @Test +// public void speedTestRandomAccessFile() throws IOException { +// LOG.info("Starting speed test with java random access file ..."); +// +// Channel.ID tmpChannel = ioManager.createChannel(); +// File tempFile = null; +// RandomAccessFile raf = null; +// +// try { +// tempFile = new File(tmpChannel.getPath()); +// raf = new RandomAccessFile(tempFile, "rw"); +// +// IntegerRecord rec = new IntegerRecord(0); +// +// long writeStart = System.currentTimeMillis(); +// +// int valsLeft = NUM_INTS_WRITTEN; +// while (valsLeft-- > 0) { +// rec.setValue(valsLeft); +// rec.write(raf); +// } +// raf.close(); +// raf = null; +// +// long writeElapsed = System.currentTimeMillis() - writeStart; +// +// // ---------------------------------------------------------------- +// +// raf = new RandomAccessFile(tempFile, "r"); +// +// long readStart = System.currentTimeMillis(); +// +// valsLeft = NUM_INTS_WRITTEN; +// while (valsLeft-- > 0) { +// rec.read(raf); +// } +// raf.close(); +// raf = null; +// +// long readElapsed = System.currentTimeMillis() - readStart; +// +// +// LOG.info("Random Access File: write " + (writeElapsed / 1000) + " secs, read " + (readElapsed / 1000) + " secs."); +// } +// finally { +// // close if possible +// if (raf != null) { +// raf.close(); +// } +// +// // try to delete the file +// if (tempFile != null) { +// tempFile.delete(); +// } +// } +// } + + @Benchmark + public void speedOutputStreamWithBufferAligned() throws Exception + { + LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ..."); + + speedOutputTestStream(segmentSizesAligned); + } + + @Benchmark + public void speedOutputStreamWithBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ..."); + + speedOutputTestStream(segmentSizesUnaligned); + } + + @Benchmark + public void speedInputStreamWithBufferAligned() throws Exception + { + LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ..."); + + speedInputTestStream(segmentSizesAligned); + } + + @Benchmark + public void speedInputStreamWithBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ..."); + + speedInputTestStream(segmentSizesUnaligned); + } + + private void speedOutputTestStream(int bufferSize) throws IOException { + final FileIOChannel.ID tmpChannel = ioManager.createChannel(); + final IntValue rec = new IntValue(0); + + File tempFile = null; + DataOutputStream daos = null; + + try { + tempFile = new File(tmpChannel.getPath()); + + FileOutputStream fos = new FileOutputStream(tempFile); + daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize)); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + rec.setValue(valsLeft); + rec.write(new OutputViewDataOutputStreamWrapper(daos)); + } + daos.close(); + daos = null; + } + finally { + // close if possible + if (daos != null) { + daos.close(); + } + // try to delete the file + if (tempFile != null) { + tempFile.delete(); + } + } + } + + private void speedInputTestStream(int bufferSize) throws IOException { + final FileIOChannel.ID tmpChannel = ioManager.createChannel(); + final IntValue rec = new IntValue(0); + + File tempFile = null; + DataInputStream dais = null; + + if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288))) + { + tempFile = ioManagerTempFile1; + } + if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481))) + { + tempFile = ioManagerTempFile2; + } + + try { + FileInputStream fis = new FileInputStream(tempFile); + dais = new DataInputStream(new BufferedInputStream(fis, bufferSize)); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + rec.read(new InputViewDataInputStreamWrapper(dais)); + } + dais.close(); + dais = null; + } + finally { + // close if possible + if (dais != null) { + dais.close(); + } + } + } + + // ------------------------------------------------------------------------ + + @Benchmark + public void speedWriteIndirectAndBufferAligned() throws Exception + { + LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ..."); + + speedWriteTestNIO(segmentSizesAligned, false); + } + + @Benchmark + public void speedWriteIndirectAndBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ..."); + + speedWriteTestNIO(segmentSizesUnaligned, false); + } + + @Benchmark + public void speedWriteDirectAndBufferAligned() throws Exception + { + LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ..."); + + speedWriteTestNIO(segmentSizesAligned, true); + } + + @Benchmark + public void speedWriteDirectAndBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ..."); + + speedWriteTestNIO(segmentSizesUnaligned, true); + } + + @Benchmark + public void speedReadIndirectAndBufferAligned() throws Exception + { + LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ..."); + + speedReadTestNIO(segmentSizesAligned, false); + } + + @Benchmark + public void speedReadIndirectAndBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ..."); + + speedReadTestNIO(segmentSizesUnaligned, false); + } + + @Benchmark + public void speedReadDirectAndBufferAligned() throws Exception + { + LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ..."); + + speedReadTestNIO(segmentSizesAligned, true); + } + + @Benchmark + public void speedReadDirectAndBufferUnaligned() throws Exception + { + LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ..."); + + speedReadTestNIO(segmentSizesUnaligned, true); + } + + + @SuppressWarnings("resource") + private void speedWriteTestNIO(int bufferSize, boolean direct) throws IOException + { + final FileIOChannel.ID tmpChannel = ioManager.createChannel(); + + File tempFile = null; + FileChannel fs = null; + + try { + tempFile = new File(tmpChannel.getPath()); + + RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); + fs = raf.getChannel(); + + ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + if (buf.remaining() < 4) { + buf.flip(); + fs.write(buf); + buf.clear(); + } + buf.putInt(valsLeft); + } + + if (buf.position() > 0) { + buf.flip(); + fs.write(buf); + } + + fs.close(); + raf.close(); + fs = null; + } + finally { + // close if possible + if (fs != null) { + fs.close(); + fs = null; + } + // try to delete the file + if (tempFile != null) { + tempFile.delete(); + } + } + } + + @SuppressWarnings("resource") + private void speedReadTestNIO(int bufferSize, boolean direct) throws IOException + { + File tempFile = null; + FileChannel fs = null; + + if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288))&&(direct)) + { + tempFile = speedTestNIOTempFile1; + } + if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize == 524288))&&(!direct)) + { + tempFile = speedTestNIOTempFile2; + } + if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481))&&(direct)) + { + tempFile = speedTestNIOTempFile3; + } + if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize == 500481))&&(!direct)) + { + tempFile = speedTestNIOTempFile4; + } + + try { + ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize); + + RandomAccessFile raf = new RandomAccessFile(tempFile, "r"); + fs = raf.getChannel(); + buf.clear(); + + fs.read(buf); + buf.flip(); + + int valsLeft = NUM_INTS_WRITTEN; + while (valsLeft-- > 0) { + if (buf.remaining() < 4) { + buf.compact(); + fs.read(buf); + buf.flip(); + } + if (buf.getInt() != valsLeft) { + throw new IOException(); + } + } + + fs.close(); + raf.close(); + + } + finally { + // close if possible + if (fs != null) { + fs.close(); + fs = null; + } + } + } + + public static void main(String[] args) throws Exception { + Options opt = new OptionsBuilder() + .include(IOManagerPerformanceBenchmark.class.getSimpleName()) + .warmupIterations(2) + .measurementIterations(2) + .forks(1) + .build(); + new Runner(opt).run(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java deleted file mode 100644 index fd02623..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.disk.iomanager; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.List; - -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.types.IntValue; -import org.junit.Assert; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryManager; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class IOManagerPerformanceBenchmark { - - private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class); - - private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 524288 }; - - private static final int[] SEGMENT_SIZES_UNALIGNED = { 3862, 16895, 500481 }; - - private static final int[] NUM_SEGMENTS = { 1, 2, 4, 6 }; - - private static final long MEMORY_SIZE = 32 * 1024 * 1024; - - private static final int NUM_INTS_WRITTEN = 100000000; - - - private static final AbstractInvokable memoryOwner = new DummyInvokable(); - - private MemoryManager memManager; - - private IOManager ioManager; - - - @Before - public void startup() { - memManager = new MemoryManager(MEMORY_SIZE, 1); - ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() throws Exception { - ioManager.shutdown(); - Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); - - Assert.assertTrue("Not all memory was returned to the memory manager in the test.", memManager.verifyEmpty()); - memManager.shutdown(); - memManager = null; - } - -// ------------------------------------------------------------------------ - - @Test - public void speedTestIOManager() throws Exception - { - LOG.info("Starting speed test with IO Manager..."); - - for (int num : NUM_SEGMENTS) { - testChannelWithSegments(num); - } - } - - private void testChannelWithSegments(int numSegments) throws Exception - { - final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments); - final FileIOChannel.ID channel = this.ioManager.createChannel(); - - BlockChannelWriter<MemorySegment> writer = null; - BlockChannelReader<MemorySegment> reader = null; - - try { - writer = this.ioManager.createBlockChannelWriter(channel); - final ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize()); - - long writeStart = System.currentTimeMillis(); - - int valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - out.writeInt(valsLeft); - } - - out.close(); - final int numBlocks = out.getBlockCount(); - writer.close(); - writer = null; - - long writeElapsed = System.currentTimeMillis() - writeStart; - - // ---------------------------------------------------------------- - - reader = ioManager.createBlockChannelReader(channel); - final ChannelReaderInputView in = new ChannelReaderInputView(reader, memory, numBlocks, false); - - long readStart = System.currentTimeMillis(); - - valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - in.readInt(); -// Assert.assertTrue(rec.getValue() == valsLeft); - } - - in.close(); - reader.close(); - - long readElapsed = System.currentTimeMillis() - readStart; - - reader.deleteChannel(); - reader = null; - - LOG.info("IOManager with " + numSegments + " mem segments: write " + writeElapsed + " msecs, read " + readElapsed + " msecs."); - - memManager.release(memory); - } - finally { - if (reader != null) { - reader.closeAndDelete(); - } - if (writer != null) { - writer.closeAndDelete(); - } - } - } - -// @Test -// public void speedTestRandomAccessFile() throws IOException { -// LOG.info("Starting speed test with java random access file ..."); -// -// Channel.ID tmpChannel = ioManager.createChannel(); -// File tempFile = null; -// RandomAccessFile raf = null; -// -// try { -// tempFile = new File(tmpChannel.getPath()); -// raf = new RandomAccessFile(tempFile, "rw"); -// -// IntegerRecord rec = new IntegerRecord(0); -// -// long writeStart = System.currentTimeMillis(); -// -// int valsLeft = NUM_INTS_WRITTEN; -// while (valsLeft-- > 0) { -// rec.setValue(valsLeft); -// rec.write(raf); -// } -// raf.close(); -// raf = null; -// -// long writeElapsed = System.currentTimeMillis() - writeStart; -// -// // ---------------------------------------------------------------- -// -// raf = new RandomAccessFile(tempFile, "r"); -// -// long readStart = System.currentTimeMillis(); -// -// valsLeft = NUM_INTS_WRITTEN; -// while (valsLeft-- > 0) { -// rec.read(raf); -// } -// raf.close(); -// raf = null; -// -// long readElapsed = System.currentTimeMillis() - readStart; -// -// -// LOG.info("Random Access File: write " + (writeElapsed / 1000) + " secs, read " + (readElapsed / 1000) + " secs."); -// } -// finally { -// // close if possible -// if (raf != null) { -// raf.close(); -// } -// -// // try to delete the file -// if (tempFile != null) { -// tempFile.delete(); -// } -// } -// } - - @Test - public void speedTestFileStream() throws Exception - { - LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_ALIGNED) - { - speedTestStream(bufferSize); - } - - LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_UNALIGNED) - { - speedTestStream(bufferSize); - } - - } - - private void speedTestStream(int bufferSize) throws IOException { - final FileIOChannel.ID tmpChannel = ioManager.createChannel(); - final IntValue rec = new IntValue(0); - - File tempFile = null; - DataOutputStream daos = null; - DataInputStream dais = null; - - try { - tempFile = new File(tmpChannel.getPath()); - - FileOutputStream fos = new FileOutputStream(tempFile); - daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize)); - - long writeStart = System.currentTimeMillis(); - - int valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - rec.setValue(valsLeft); - rec.write(new OutputViewDataOutputStreamWrapper(daos)); - } - daos.close(); - daos = null; - - long writeElapsed = System.currentTimeMillis() - writeStart; - - // ---------------------------------------------------------------- - - FileInputStream fis = new FileInputStream(tempFile); - dais = new DataInputStream(new BufferedInputStream(fis, bufferSize)); - - long readStart = System.currentTimeMillis(); - - valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - rec.read(new InputViewDataInputStreamWrapper(dais)); - } - dais.close(); - dais = null; - - long readElapsed = System.currentTimeMillis() - readStart; - - LOG.info("File-Stream with buffer " + bufferSize + ": write " + writeElapsed + " msecs, read " + readElapsed + " msecs."); - } - finally { - // close if possible - if (daos != null) { - daos.close(); - } - if (dais != null) { - dais.close(); - } - // try to delete the file - if (tempFile != null) { - tempFile.delete(); - } - } - } - - // ------------------------------------------------------------------------ - - @Test - public void speedTestNIO() throws Exception - { - LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_ALIGNED) - { - speedTestNIO(bufferSize, false); - } - - LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_UNALIGNED) - { - speedTestNIO(bufferSize, false); - } - - LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_ALIGNED) - { - speedTestNIO(bufferSize, true); - } - - LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ..."); - - for (int bufferSize : SEGMENT_SIZES_UNALIGNED) - { - speedTestNIO(bufferSize, true); - } - - } - - @SuppressWarnings("resource") - private void speedTestNIO(int bufferSize, boolean direct) throws IOException - { - final FileIOChannel.ID tmpChannel = ioManager.createChannel(); - - File tempFile = null; - FileChannel fs = null; - - try { - tempFile = new File(tmpChannel.getPath()); - - RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); - fs = raf.getChannel(); - - ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize); - - long writeStart = System.currentTimeMillis(); - - int valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - if (buf.remaining() < 4) { - buf.flip(); - fs.write(buf); - buf.clear(); - } - buf.putInt(valsLeft); - } - - if (buf.position() > 0) { - buf.flip(); - fs.write(buf); - } - - fs.close(); - raf.close(); - fs = null; - - long writeElapsed = System.currentTimeMillis() - writeStart; - - // ---------------------------------------------------------------- - - raf = new RandomAccessFile(tempFile, "r"); - fs = raf.getChannel(); - buf.clear(); - - long readStart = System.currentTimeMillis(); - - fs.read(buf); - buf.flip(); - - valsLeft = NUM_INTS_WRITTEN; - while (valsLeft-- > 0) { - if (buf.remaining() < 4) { - buf.compact(); - fs.read(buf); - buf.flip(); - } - if (buf.getInt() != valsLeft) { - throw new IOException(); - } - } - - fs.close(); - raf.close(); - - long readElapsed = System.currentTimeMillis() - readStart; - - LOG.info("NIO Channel with buffer " + bufferSize + ": write " + writeElapsed + " msecs, read " + readElapsed + " msecs."); - } - finally { - // close if possible - if (fs != null) { - fs.close(); - fs = null; - } - // try to delete the file - if (tempFile != null) { - tempFile.delete(); - } - } - } - -}