[FLINK-2869] [tests] Port IOManagerPerformanceBenchmark to JMH.

This closes #1270.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3abbcd1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3abbcd1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3abbcd1e

Branch: refs/heads/master
Commit: 3abbcd1eed42c2f7a65243ec6f27d16dbfc235b5
Parents: b654e98
Author: gallenvara <gallenv...@126.com>
Authored: Tue Oct 20 09:49:34 2015 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Oct 29 10:23:22 2015 +0100

----------------------------------------------------------------------
 .../IOManagerPerformanceBenchmark.java          | 608 +++++++++++++++++++
 .../IOManagerPerformanceBenchmark.java          | 415 -------------
 2 files changed, 608 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
 
b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
new file mode 100644
index 0000000..7d31925
--- /dev/null
+++ 
b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.runtime.io.disk.iomanager.*;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.IntValue;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class IOManagerPerformanceBenchmark {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
+
+       @Param({"4096", "16384", "524288"})
+       private int segmentSizesAligned;
+
+       @Param({"3862", "16895", "500481"})
+       private int segmentSizesUnaligned;
+
+       @Param({"1", "2", "4", "6"})
+       private int numSegment;
+
+       private static int numBlocks;
+
+       private static final long MEMORY_SIZE = 32 * 1024 * 1024;
+
+       private static final int NUM_INTS_WRITTEN = 100000000;
+
+       private static final AbstractInvokable memoryOwner = new 
DummyInvokable();
+
+       private MemoryManager memManager;
+
+       private IOManager ioManager;
+
+       private static FileIOChannel.ID fileIOChannel;
+
+       private static File ioManagerTempFile1;
+
+       private static File ioManagerTempFile2;
+
+       private static File speedTestNIOTempFile1;
+
+       private static File speedTestNIOTempFile2;
+
+       private static File speedTestNIOTempFile3;
+
+       private static File speedTestNIOTempFile4;
+
+
+       @Setup
+       public void startup() throws Exception {
+               memManager = new MemoryManager(MEMORY_SIZE, 1);
+               ioManager = new IOManagerAsync();
+               testChannelWriteWithSegments(numSegment);
+               ioManagerTempFile1 = createReadTempFile(segmentSizesAligned);
+               ioManagerTempFile2 = createReadTempFile(segmentSizesUnaligned);
+               speedTestNIOTempFile1 = 
createSpeedTestNIOTempFile(segmentSizesAligned, true);
+               speedTestNIOTempFile2 = 
createSpeedTestNIOTempFile(segmentSizesAligned, false);
+               speedTestNIOTempFile3 = 
createSpeedTestNIOTempFile(segmentSizesUnaligned, true);
+               speedTestNIOTempFile4 = 
createSpeedTestNIOTempFile(segmentSizesUnaligned, false);
+       }
+
+       @TearDown
+       public void afterTest() throws Exception {
+               ioManagerTempFile1.delete();
+               ioManagerTempFile2.delete();
+               speedTestNIOTempFile1.delete();
+               speedTestNIOTempFile2.delete();
+               speedTestNIOTempFile3.delete();
+               speedTestNIOTempFile4.delete();
+               ioManager.shutdown();
+               Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
+
+               Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", memManager.verifyEmpty());
+               memManager.shutdown();
+               memManager = null;
+       }
+
+// ------------------------------------------------------------------------
+
+       private File createReadTempFile(int bufferSize) throws IOException {
+               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+               final IntValue rec = new IntValue(0);
+
+               File tempFile = null;
+               DataOutputStream daos = null;
+
+               try {
+                       tempFile = new File(tmpChannel.getPath());
+
+                       FileOutputStream fos = new FileOutputStream(tempFile);
+                       daos = new DataOutputStream(new 
BufferedOutputStream(fos, bufferSize));
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               rec.setValue(valsLeft);
+                               rec.write(new 
OutputViewDataOutputStreamWrapper(daos));
+                       }
+                       daos.close();
+                       daos = null;
+               }
+               finally {
+                       // close if possible
+                       if (daos != null) {
+                               daos.close();
+                       }
+               }
+               return tempFile;
+       }
+
+       @SuppressWarnings("resource")
+       private File createSpeedTestNIOTempFile(int bufferSize, boolean direct) 
throws IOException
+       {
+               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+
+               File tempFile = null;
+               FileChannel fs = null;
+
+               try {
+                       tempFile = new File(tmpChannel.getPath());
+
+                       RandomAccessFile raf = new RandomAccessFile(tempFile, 
"rw");
+                       fs = raf.getChannel();
+
+                       ByteBuffer buf = direct ? 
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               if (buf.remaining() < 4) {
+                                       buf.flip();
+                                       fs.write(buf);
+                                       buf.clear();
+                               }
+                               buf.putInt(valsLeft);
+                       }
+
+                       if (buf.position() > 0) {
+                               buf.flip();
+                               fs.write(buf);
+                       }
+
+                       fs.close();
+                       raf.close();
+                       fs = null;
+               }
+               finally {
+                       // close if possible
+                       if (fs != null) {
+                               fs.close();
+                               fs = null;
+                       }
+               }
+               return tempFile;
+       }
+
+       @Benchmark
+       public void speedTestOutputManager() throws Exception
+       {
+               LOG.info("Starting speed test with IO Manager...");
+
+               testChannelWriteWithSegments(numSegment);
+       }
+
+       @Benchmark
+       public void speedTestInputManager() throws Exception
+       {
+               LOG.info("Starting speed test with IO Manager...");
+
+               testChannelReadWithSegments(numSegment);
+       }
+
+       private void testChannelWriteWithSegments(int numSegments) throws 
Exception
+       {
+               final List<MemorySegment> memory = 
this.memManager.allocatePages(memoryOwner, numSegments);
+               final FileIOChannel.ID channel = this.ioManager.createChannel();
+
+               BlockChannelWriter<MemorySegment> writer = null;
+
+               try {
+                       writer = 
this.ioManager.createBlockChannelWriter(channel);
+                       final ChannelWriterOutputView out = new 
ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               out.writeInt(valsLeft);
+                       }
+
+                       fileIOChannel = channel;
+                       out.close();
+                       numBlocks = out.getBlockCount();
+
+                       writer.close();
+                       writer = null;
+
+                       memManager.release(memory);
+               }
+               finally {
+                       if (writer != null) {
+                               writer.closeAndDelete();
+                       }
+               }
+       }
+
+       private void testChannelReadWithSegments(int numSegments) throws 
Exception
+       {
+               final List<MemorySegment> memory = 
this.memManager.allocatePages(memoryOwner, numSegments);
+
+               BlockChannelReader<MemorySegment> reader = null;
+
+               try {
+                       reader = 
ioManager.createBlockChannelReader(fileIOChannel);
+                       final ChannelReaderInputView in = new 
ChannelReaderInputView(reader, memory, numBlocks, false);
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               in.readInt();
+//                             Assert.assertTrue(rec.getValue() == valsLeft);
+                       }
+
+                       in.close();
+                       reader.close();
+                       reader = null;
+
+                       memManager.release(memory);
+               }
+               finally {
+                       if (reader != null) {
+                               reader.closeAndDelete();
+                       }
+               }
+       }
+
+//     @Test
+//     public void speedTestRandomAccessFile() throws IOException {
+//             LOG.info("Starting speed test with java random access file 
...");
+//
+//             Channel.ID tmpChannel = ioManager.createChannel();
+//             File tempFile = null;
+//             RandomAccessFile raf = null;
+//
+//             try {
+//                     tempFile = new File(tmpChannel.getPath());
+//                     raf = new RandomAccessFile(tempFile, "rw");
+//
+//                     IntegerRecord rec = new IntegerRecord(0);
+//
+//                     long writeStart = System.currentTimeMillis();
+//
+//                     int valsLeft = NUM_INTS_WRITTEN;
+//                     while (valsLeft-- > 0) {
+//                             rec.setValue(valsLeft);
+//                             rec.write(raf);
+//                     }
+//                     raf.close();
+//                     raf = null;
+//
+//                     long writeElapsed = System.currentTimeMillis() - 
writeStart;
+//
+//                     // 
----------------------------------------------------------------
+//
+//                     raf = new RandomAccessFile(tempFile, "r");
+//
+//                     long readStart = System.currentTimeMillis();
+//
+//                     valsLeft = NUM_INTS_WRITTEN;
+//                     while (valsLeft-- > 0) {
+//                             rec.read(raf);
+//                     }
+//                     raf.close();
+//                     raf = null;
+//
+//                     long readElapsed = System.currentTimeMillis() - 
readStart;
+//
+//
+//                     LOG.info("Random Access File: write " + (writeElapsed / 
1000) + " secs, read " + (readElapsed / 1000) + " secs.");
+//             }
+//             finally {
+//                     // close if possible
+//                     if (raf != null) {
+//                             raf.close();
+//                     }
+//
+//                     // try to delete the file
+//                     if (tempFile != null) {
+//                             tempFile.delete();
+//                     }
+//             }
+//     }
+
+       @Benchmark
+       public void speedOutputStreamWithBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java io file stream and 
ALIGNED buffer sizes ...");
+
+               speedOutputTestStream(segmentSizesAligned);
+       }
+
+       @Benchmark
+       public void speedOutputStreamWithBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java io file stream and 
UNALIGNED buffer sizes ...");
+
+               speedOutputTestStream(segmentSizesUnaligned);
+       }
+
+       @Benchmark
+       public void speedInputStreamWithBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java io file stream and 
ALIGNED buffer sizes ...");
+
+               speedInputTestStream(segmentSizesAligned);
+       }
+
+       @Benchmark
+       public void speedInputStreamWithBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java io file stream and 
UNALIGNED buffer sizes ...");
+
+               speedInputTestStream(segmentSizesUnaligned);
+       }
+
+       private void speedOutputTestStream(int bufferSize) throws IOException {
+               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+               final IntValue rec = new IntValue(0);
+
+               File tempFile = null;
+               DataOutputStream daos = null;
+
+               try {
+                       tempFile = new File(tmpChannel.getPath());
+
+                       FileOutputStream fos = new FileOutputStream(tempFile);
+                       daos = new DataOutputStream(new 
BufferedOutputStream(fos, bufferSize));
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               rec.setValue(valsLeft);
+                               rec.write(new 
OutputViewDataOutputStreamWrapper(daos));
+                       }
+                       daos.close();
+                       daos = null;
+               }
+               finally {
+                       // close if possible
+                       if (daos != null) {
+                               daos.close();
+                       }
+                       // try to delete the file
+                       if (tempFile != null) {
+                               tempFile.delete();
+                       }
+               }
+       }
+
+       private void speedInputTestStream(int bufferSize) throws IOException {
+               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+               final IntValue rec = new IntValue(0);
+
+               File tempFile = null;
+               DataInputStream dais = null;
+
+               if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize 
== 524288)))
+               {
+                       tempFile = ioManagerTempFile1;
+               }
+               if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize 
== 500481)))
+               {
+                       tempFile = ioManagerTempFile2;
+               }
+
+               try {
+                       FileInputStream fis = new FileInputStream(tempFile);
+                       dais = new DataInputStream(new BufferedInputStream(fis, 
bufferSize));
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               rec.read(new 
InputViewDataInputStreamWrapper(dais));
+                       }
+                       dais.close();
+                       dais = null;
+               }
+               finally {
+                       // close if possible
+                       if (dais != null) {
+                               dais.close();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Benchmark
+       public void speedWriteIndirectAndBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO heap buffers and 
ALIGNED buffer sizes ...");
+
+               speedWriteTestNIO(segmentSizesAligned, false);
+       }
+
+       @Benchmark
+       public void speedWriteIndirectAndBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO heap buffers and 
UNALIGNED buffer sizes ...");
+
+               speedWriteTestNIO(segmentSizesUnaligned, false);
+       }
+
+       @Benchmark
+       public void speedWriteDirectAndBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO direct buffers and 
ALIGNED buffer sizes ...");
+
+               speedWriteTestNIO(segmentSizesAligned, true);
+       }
+
+       @Benchmark
+       public void speedWriteDirectAndBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO direct buffers and 
UNALIGNED buffer sizes ...");
+
+               speedWriteTestNIO(segmentSizesUnaligned, true);
+       }
+
+       @Benchmark
+       public void speedReadIndirectAndBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO heap buffers and 
ALIGNED buffer sizes ...");
+
+               speedReadTestNIO(segmentSizesAligned, false);
+       }
+
+       @Benchmark
+       public void speedReadIndirectAndBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO heap buffers and 
UNALIGNED buffer sizes ...");
+
+               speedReadTestNIO(segmentSizesUnaligned, false);
+       }
+
+       @Benchmark
+       public void speedReadDirectAndBufferAligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO direct buffers and 
ALIGNED buffer sizes ...");
+
+               speedReadTestNIO(segmentSizesAligned, true);
+       }
+
+       @Benchmark
+       public void speedReadDirectAndBufferUnaligned() throws Exception
+       {
+               LOG.info("Starting speed test with java NIO direct buffers and 
UNALIGNED buffer sizes ...");
+
+               speedReadTestNIO(segmentSizesUnaligned, true);
+       }
+
+
+       @SuppressWarnings("resource")
+       private void speedWriteTestNIO(int bufferSize, boolean direct) throws 
IOException
+       {
+               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+
+               File tempFile = null;
+               FileChannel fs = null;
+
+               try {
+                       tempFile = new File(tmpChannel.getPath());
+
+                       RandomAccessFile raf = new RandomAccessFile(tempFile, 
"rw");
+                       fs = raf.getChannel();
+
+                       ByteBuffer buf = direct ? 
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               if (buf.remaining() < 4) {
+                                       buf.flip();
+                                       fs.write(buf);
+                                       buf.clear();
+                               }
+                               buf.putInt(valsLeft);
+                       }
+
+                       if (buf.position() > 0) {
+                               buf.flip();
+                               fs.write(buf);
+                       }
+
+                       fs.close();
+                       raf.close();
+                       fs = null;
+               }
+               finally {
+                       // close if possible
+                       if (fs != null) {
+                               fs.close();
+                               fs = null;
+                       }
+                       // try to delete the file
+                       if (tempFile != null) {
+                               tempFile.delete();
+                       }
+               }
+       }
+
+       @SuppressWarnings("resource")
+       private void speedReadTestNIO(int bufferSize, boolean direct) throws 
IOException
+       {
+               File tempFile = null;
+               FileChannel fs = null;
+
+               if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize 
== 524288))&&(direct))
+               {
+                       tempFile = speedTestNIOTempFile1;
+               }
+               if (((bufferSize == 4096)||(bufferSize == 16384)||(bufferSize 
== 524288))&&(!direct))
+               {
+                       tempFile = speedTestNIOTempFile2;
+               }
+               if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize 
== 500481))&&(direct))
+               {
+                       tempFile = speedTestNIOTempFile3;
+               }
+               if (((bufferSize == 3862)||(bufferSize == 16895)||(bufferSize 
== 500481))&&(!direct))
+               {
+                       tempFile = speedTestNIOTempFile4;
+               }
+
+               try {
+                       ByteBuffer buf = direct ? 
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+                       RandomAccessFile raf = new RandomAccessFile(tempFile, 
"r");
+                       fs = raf.getChannel();
+                       buf.clear();
+
+                       fs.read(buf);
+                       buf.flip();
+
+                       int valsLeft = NUM_INTS_WRITTEN;
+                       while (valsLeft-- > 0) {
+                               if (buf.remaining() < 4) {
+                                       buf.compact();
+                                       fs.read(buf);
+                                       buf.flip();
+                               }
+                               if (buf.getInt() != valsLeft) {
+                                       throw new IOException();
+                               }
+                       }
+
+                       fs.close();
+                       raf.close();
+
+               }
+               finally {
+                       // close if possible
+                       if (fs != null) {
+                               fs.close();
+                               fs = null;
+                       }
+               }
+       }
+
+       public static void main(String[] args) throws Exception {
+               Options opt = new OptionsBuilder()
+                               
.include(IOManagerPerformanceBenchmark.class.getSimpleName())
+                               .warmupIterations(2)
+                               .measurementIterations(2)
+                               .forks(1)
+                               .build();
+               new Runner(opt).run();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3abbcd1e/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
deleted file mode 100644
index fd02623..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.List;
-
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.types.IntValue;
-import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class IOManagerPerformanceBenchmark {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
-       
-       private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 
524288 };
-       
-       private static final int[] SEGMENT_SIZES_UNALIGNED = { 3862, 16895, 
500481 };
-       
-       private static final int[] NUM_SEGMENTS = { 1, 2, 4, 6 };
-       
-       private static final long MEMORY_SIZE = 32 * 1024 * 1024;
-       
-       private static final int NUM_INTS_WRITTEN = 100000000;
-       
-       
-       private static final AbstractInvokable memoryOwner = new 
DummyInvokable();
-       
-       private MemoryManager memManager;
-       
-       private IOManager ioManager;
-       
-       
-       @Before
-       public void startup() {
-               memManager = new MemoryManager(MEMORY_SIZE, 1);
-               ioManager = new IOManagerAsync();
-       }
-       
-       @After
-       public void afterTest() throws Exception {
-               ioManager.shutdown();
-               Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
-               
-               Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", memManager.verifyEmpty());
-               memManager.shutdown();
-               memManager = null;
-       }
-       
-// ------------------------------------------------------------------------
-       
-       @Test
-       public void speedTestIOManager() throws Exception
-       {
-               LOG.info("Starting speed test with IO Manager...");
-               
-               for (int num : NUM_SEGMENTS) {
-                       testChannelWithSegments(num);
-               }
-       }
-
-       private void testChannelWithSegments(int numSegments) throws Exception
-       {
-               final List<MemorySegment> memory = 
this.memManager.allocatePages(memoryOwner, numSegments);
-               final FileIOChannel.ID channel = this.ioManager.createChannel();
-               
-               BlockChannelWriter<MemorySegment> writer = null;
-               BlockChannelReader<MemorySegment> reader = null;
-               
-               try {   
-                       writer = 
this.ioManager.createBlockChannelWriter(channel);
-                       final ChannelWriterOutputView out = new 
ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
-                       
-                       long writeStart = System.currentTimeMillis();
-                       
-                       int valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               out.writeInt(valsLeft);
-                       }
-                       
-                       out.close();
-                       final int numBlocks = out.getBlockCount();
-                       writer.close();
-                       writer = null;
-                       
-                       long writeElapsed = System.currentTimeMillis() - 
writeStart;
-                       
-                       // 
----------------------------------------------------------------
-                       
-                       reader = ioManager.createBlockChannelReader(channel);
-                       final ChannelReaderInputView in = new 
ChannelReaderInputView(reader, memory, numBlocks, false);
-                       
-                       long readStart = System.currentTimeMillis();
-                       
-                       valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               in.readInt();
-//                             Assert.assertTrue(rec.getValue() == valsLeft);
-                       }
-                       
-                       in.close();
-                       reader.close();
-                       
-                       long readElapsed = System.currentTimeMillis() - 
readStart;
-                       
-                       reader.deleteChannel();
-                       reader = null;
-                       
-                       LOG.info("IOManager with " + numSegments + " mem 
segments: write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
-                       
-                       memManager.release(memory);
-               }
-               finally {
-                       if (reader != null) {
-                               reader.closeAndDelete();
-                       }
-                       if (writer != null) {
-                               writer.closeAndDelete();
-                       }
-               }
-       }
-
-//     @Test
-//     public void speedTestRandomAccessFile() throws IOException {
-//             LOG.info("Starting speed test with java random access file 
...");
-//             
-//             Channel.ID tmpChannel = ioManager.createChannel();
-//             File tempFile = null;
-//             RandomAccessFile raf = null;
-//             
-//             try {
-//                     tempFile = new File(tmpChannel.getPath()); 
-//                     raf = new RandomAccessFile(tempFile, "rw");
-//                     
-//                     IntegerRecord rec = new IntegerRecord(0);
-//                     
-//                     long writeStart = System.currentTimeMillis();
-//                     
-//                     int valsLeft = NUM_INTS_WRITTEN;
-//                     while (valsLeft-- > 0) {
-//                             rec.setValue(valsLeft);
-//                             rec.write(raf);
-//                     }
-//                     raf.close();
-//                     raf = null;
-//                     
-//                     long writeElapsed = System.currentTimeMillis() - 
writeStart;
-//                     
-//                     // 
----------------------------------------------------------------
-//                     
-//                     raf = new RandomAccessFile(tempFile, "r");
-//                     
-//                     long readStart = System.currentTimeMillis();
-//                     
-//                     valsLeft = NUM_INTS_WRITTEN;
-//                     while (valsLeft-- > 0) {
-//                             rec.read(raf);
-//                     }
-//                     raf.close();
-//                     raf = null;
-//                     
-//                     long readElapsed = System.currentTimeMillis() - 
readStart;
-//                     
-//                     
-//                     LOG.info("Random Access File: write " + (writeElapsed / 
1000) + " secs, read " + (readElapsed / 1000) + " secs.");
-//             }
-//             finally {
-//                     // close if possible
-//                     if (raf != null) {
-//                             raf.close();
-//                     }
-//                     
-//                     // try to delete the file
-//                     if (tempFile != null) {
-//                             tempFile.delete();
-//                     }
-//             }
-//     }
-
-       @Test
-       public void speedTestFileStream() throws Exception
-       {
-               LOG.info("Starting speed test with java io file stream and 
ALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-               {
-                       speedTestStream(bufferSize);
-               }
-               
-               LOG.info("Starting speed test with java io file stream and 
UNALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-               {
-                       speedTestStream(bufferSize);
-               }
-               
-       }
-               
-       private void speedTestStream(int bufferSize) throws IOException {
-               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
-               final IntValue rec = new IntValue(0);
-               
-               File tempFile = null;
-               DataOutputStream daos = null;
-               DataInputStream dais = null;
-               
-               try {
-                       tempFile = new File(tmpChannel.getPath());
-                       
-                       FileOutputStream fos = new FileOutputStream(tempFile);
-                       daos = new DataOutputStream(new 
BufferedOutputStream(fos, bufferSize));
-                       
-                       long writeStart = System.currentTimeMillis();
-                       
-                       int valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               rec.setValue(valsLeft);
-                               rec.write(new 
OutputViewDataOutputStreamWrapper(daos));
-                       }
-                       daos.close();
-                       daos = null;
-                       
-                       long writeElapsed = System.currentTimeMillis() - 
writeStart;
-                       
-                       // 
----------------------------------------------------------------
-                       
-                       FileInputStream fis = new FileInputStream(tempFile);
-                       dais = new DataInputStream(new BufferedInputStream(fis, 
bufferSize));
-                       
-                       long readStart = System.currentTimeMillis();
-                       
-                       valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               rec.read(new 
InputViewDataInputStreamWrapper(dais));
-                       }
-                       dais.close();
-                       dais = null;
-                       
-                       long readElapsed = System.currentTimeMillis() - 
readStart;
-                       
-                       LOG.info("File-Stream with buffer " + bufferSize + ": 
write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
-               }
-               finally {
-                       // close if possible
-                       if (daos != null) {
-                               daos.close();
-                       }
-                       if (dais != null) {
-                               dais.close();
-                       }
-                       // try to delete the file
-                       if (tempFile != null) {
-                               tempFile.delete();
-                       }
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void speedTestNIO() throws Exception
-       {
-               LOG.info("Starting speed test with java NIO heap buffers and 
ALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-               {
-                       speedTestNIO(bufferSize, false);
-               }
-               
-               LOG.info("Starting speed test with java NIO heap buffers and 
UNALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-               {
-                       speedTestNIO(bufferSize, false);
-               }
-               
-               LOG.info("Starting speed test with java NIO direct buffers and 
ALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_ALIGNED)
-               {
-                       speedTestNIO(bufferSize, true);
-               }
-               
-               LOG.info("Starting speed test with java NIO direct buffers and 
UNALIGNED buffer sizes ...");
-               
-               for (int bufferSize : SEGMENT_SIZES_UNALIGNED)
-               {
-                       speedTestNIO(bufferSize, true);
-               }
-               
-       }
-               
-       @SuppressWarnings("resource")
-       private void speedTestNIO(int bufferSize, boolean direct) throws 
IOException
-       {
-               final FileIOChannel.ID tmpChannel = ioManager.createChannel();
-               
-               File tempFile = null;
-               FileChannel fs = null;
-               
-               try {
-                       tempFile = new File(tmpChannel.getPath());
-                       
-                       RandomAccessFile raf = new RandomAccessFile(tempFile, 
"rw");
-                       fs = raf.getChannel();
-                       
-                       ByteBuffer buf = direct ? 
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
-                       
-                       long writeStart = System.currentTimeMillis();
-                       
-                       int valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               if (buf.remaining() < 4) {
-                                       buf.flip();
-                                       fs.write(buf);
-                                       buf.clear();
-                               }
-                               buf.putInt(valsLeft);
-                       }
-                       
-                       if (buf.position() > 0) {
-                               buf.flip();
-                               fs.write(buf);
-                       }
-                       
-                       fs.close();
-                       raf.close();
-                       fs = null;
-                       
-                       long writeElapsed = System.currentTimeMillis() - 
writeStart;
-                       
-                       // 
----------------------------------------------------------------
-                       
-                       raf = new RandomAccessFile(tempFile, "r");
-                       fs = raf.getChannel();
-                       buf.clear();
-                       
-                       long readStart = System.currentTimeMillis();
-                       
-                       fs.read(buf);
-                       buf.flip();
-                       
-                       valsLeft = NUM_INTS_WRITTEN;
-                       while (valsLeft-- > 0) {
-                               if (buf.remaining() < 4) {
-                                       buf.compact();
-                                       fs.read(buf);
-                                       buf.flip();
-                               }
-                               if (buf.getInt() != valsLeft) {
-                                       throw new IOException();
-                               }
-                       }
-                       
-                       fs.close();
-                       raf.close();
-                       
-                       long readElapsed = System.currentTimeMillis() - 
readStart;
-                       
-                       LOG.info("NIO Channel with buffer " + bufferSize + ": 
write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
-               }
-               finally {
-                       // close if possible
-                       if (fs != null) {
-                               fs.close();
-                               fs = null;
-                       }
-                       // try to delete the file
-                       if (tempFile != null) {
-                               tempFile.delete();
-                       }
-               }
-       }
-
-}

Reply via email to