[ 
https://issues.apache.org/jira/browse/FLINK-2869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977405#comment-14977405
 ] 

ASF GitHub Bot commented on FLINK-2869:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1270#discussion_r43203880
  
    --- Diff: 
flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
 ---
    @@ -0,0 +1,613 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.benchmark.runtime.io.disk.iomanager;
    +
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +import org.apache.flink.runtime.io.disk.iomanager.*;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import org.apache.flink.runtime.operators.testutils.DummyInvokable;
    +import org.apache.flink.types.IntValue;
    +import org.junit.Assert;
    +import org.openjdk.jmh.annotations.*;
    +import org.openjdk.jmh.runner.Runner;
    +import org.openjdk.jmh.runner.options.Options;
    +import org.openjdk.jmh.runner.options.OptionsBuilder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.*;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.FileChannel;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +@State(Scope.Thread)
    +@BenchmarkMode(Mode.AverageTime)
    +@OutputTimeUnit(TimeUnit.MILLISECONDS)
    +public class IOManagerPerformanceBenchmark {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    +
    +   @Param({"4096", "16384", "524288"})
    +   private int segmentSizesAligned;
    +
    +   @Param({"3862", "16895", "500481"})
    +   private int segmentSizesUnaligned;
    +
    +   @Param({"1", "2", "4", "6"})
    +   private int numSegment;
    +
    +   private static int numBlocks;
    +
    +   private static final long MEMORY_SIZE = 32 * 1024 * 1024;
    +
    +   private static final int NUM_INTS_WRITTEN = 100000000;
    +
    +   private static final AbstractInvokable memoryOwner = new 
DummyInvokable();
    +
    +   private MemoryManager memManager;
    +
    +   private IOManager ioManager;
    +   
    +   private static FileIOChannel.ID fileIOChannel;
    +   
    +   private static File ioManagerTempFile1;
    +   
    +   private static File ioManagerTempFile2;
    +   
    +   private static File speedTestNIOTempFile1;
    +   
    +   private static File speedTestNIOTempFile2;
    +   
    +   private static File speedTestNIOTempFile3;
    +   
    +   private static File speedTestNIOTempFile4;
    +
    +
    +   @Setup
    +   public void startup() throws Exception {
    +           memManager = new MemoryManager(MEMORY_SIZE, 1);
    +           ioManager = new IOManagerAsync();
    +           this.testChannelWriteWithSegments(numSegment);
    +           ioManagerTempFile1 = 
this.createReadTempFile(segmentSizesAligned);
    +           ioManagerTempFile2 = 
this.createReadTempFile(segmentSizesUnaligned);
    +           speedTestNIOTempFile1 = 
creatSpeedTestNIOTempFile(segmentSizesAligned, true);
    +           speedTestNIOTempFile2 = 
creatSpeedTestNIOTempFile(segmentSizesAligned, false);
    +           speedTestNIOTempFile3 = 
creatSpeedTestNIOTempFile(segmentSizesUnaligned, true);
    +           speedTestNIOTempFile4 = 
creatSpeedTestNIOTempFile(segmentSizesUnaligned, false);
    +           
    +   }
    +
    +   @TearDown
    +   public void afterTest() throws Exception {
    +           ioManager.shutdown();
    +           Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
    +
    +           Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", memManager.verifyEmpty());
    +           memManager.shutdown();
    +           memManager = null;
    +   }
    +
    +// ------------------------------------------------------------------------
    +
    +   private File createReadTempFile(int bufferSize) throws IOException {
    +           final FileIOChannel.ID tmpChannel = ioManager.createChannel();
    +           final IntValue rec = new IntValue(0);
    +
    +           File tempFile = null;
    +           DataOutputStream daos = null;
    +
    +           try {
    +                   tempFile = new File(tmpChannel.getPath());
    +
    +                   FileOutputStream fos = new FileOutputStream(tempFile);
    +                   daos = new DataOutputStream(new 
BufferedOutputStream(fos, bufferSize));
    +
    +                   int valsLeft = NUM_INTS_WRITTEN;
    +                   while (valsLeft-- > 0) {
    +                           rec.setValue(valsLeft);
    +                           rec.write(new 
OutputViewDataOutputStreamWrapper(daos));
    +                   }
    +                   daos.close();
    +                   daos = null;
    +           }
    +           finally {
    +                   // close if possible
    +                   if (daos != null) {
    +                           daos.close();
    +                   }
    +           }
    +           return tempFile;
    +   }
    +
    +   @SuppressWarnings("resource")
    +   private File creatSpeedTestNIOTempFile(int bufferSize, boolean direct) 
throws IOException
    +   {
    +           final FileIOChannel.ID tmpChannel = ioManager.createChannel();
    +
    +           File tempFile = null;
    +           FileChannel fs = null;
    +
    +           try {
    +                   tempFile = new File(tmpChannel.getPath());
    +
    +                   RandomAccessFile raf = new RandomAccessFile(tempFile, 
"rw");
    +                   fs = raf.getChannel();
    +
    +                   ByteBuffer buf = direct ? 
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
    +
    +                   int valsLeft = NUM_INTS_WRITTEN;
    +                   while (valsLeft-- > 0) {
    +                           if (buf.remaining() < 4) {
    +                                   buf.flip();
    +                                   fs.write(buf);
    +                                   buf.clear();
    +                           }
    +                           buf.putInt(valsLeft);
    +                   }
    +
    +                   if (buf.position() > 0) {
    +                           buf.flip();
    +                           fs.write(buf);
    +                   }
    +
    +                   fs.close();
    +                   raf.close();
    +                   fs = null;
    +           }
    +           finally {
    +                   // close if possible
    +                   if (fs != null) {
    +                           fs.close();
    +                           fs = null;
    +                   }
    +           }
    +           return tempFile;
    +   }
    +   
    +   @Benchmark
    +   public void speedTestOutputManager() throws Exception
    +   {
    +           LOG.info("Starting speed test with IO Manager...");
    +           
    +           testChannelWriteWithSegments(numSegment);
    +   }
    +
    +   @Benchmark
    +   public void speedTestInputManager() throws Exception
    +   {
    +           LOG.info("Starting speed test with IO Manager...");
    +           
    +           testChannelReadWithSegments(numSegment);
    +   }
    +
    +   private void testChannelWriteWithSegments(int numSegments) throws 
Exception
    +   {
    +           final List<MemorySegment> memory = 
this.memManager.allocatePages(memoryOwner, numSegments);
    +           final FileIOChannel.ID channel = this.ioManager.createChannel();
    +
    +           BlockChannelWriter<MemorySegment> writer = null;
    +
    +           try {
    +                   writer = 
this.ioManager.createBlockChannelWriter(channel);
    +                   final ChannelWriterOutputView out = new 
ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
    +
    +                   int valsLeft = NUM_INTS_WRITTEN;
    +                   while (valsLeft-- > 0) {
    +                           out.writeInt(valsLeft);
    +                   }
    +
    +                   fileIOChannel = channel;
    +                   out.close();
    +                   numBlocks = out.getBlockCount();
    +                   
    +                   writer.close();
    +                   writer = null;
    +
    +                   memManager.release(memory);
    +           }
    +           finally {
    +                   if (writer != null) {
    +                           writer.closeAndDelete();
    +                   }
    +           }
    +   }
    +
    +   private void testChannelReadWithSegments(int numSegments) throws 
Exception
    +   {
    +           final List<MemorySegment> memory = 
this.memManager.allocatePages(memoryOwner, numSegments);
    +           //final FileIOChannel.ID channel = 
this.ioManager.createChannel();
    --- End diff --
    
    remove?


> Apply JMH on IOManagerPerformanceBenchmark class.
> -------------------------------------------------
>
>                 Key: FLINK-2869
>                 URL: https://issues.apache.org/jira/browse/FLINK-2869
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>            Reporter: GaoLun
>            Assignee: GaoLun
>            Priority: Minor
>              Labels: easyfix
>
> JMH is a Java harness for building, running, and analysing 
> nano/micro/milli/macro benchmarks.Use JMH to replace the old micro benchmarks 
> method in order to get much more accurate results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to