Repository: flink
Updated Branches:
  refs/heads/master daa357aca -> 6a8e90b36


http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegmentOutView.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegmentOutView.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegmentOutView.java
deleted file mode 100644
index d898dee..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/PureOffHeapMemorySegmentOutView.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.util.List;
-
-public final class PureOffHeapMemorySegmentOutView implements DataOutputView {
-
-       private PureOffHeapMemorySegment currentSegment;        // the current 
memory segment to write to
-
-       private int positionInSegment;                                  // the 
offset in the current segment
-       
-       private final int segmentSize;                          // the size of 
the memory segments
-
-       private final  List<PureOffHeapMemorySegment> memorySource;
-       
-       private final List<PureOffHeapMemorySegment> fullSegments;
-       
-
-       private byte[] utfBuffer;               // the reusable array for UTF 
encodings
-
-
-       public PureOffHeapMemorySegmentOutView(List<PureOffHeapMemorySegment> 
emptySegments,
-                                                                               
   List<PureOffHeapMemorySegment> fullSegmentTarget, int segmentSize) {
-               this.segmentSize = segmentSize;
-               this.currentSegment = emptySegments.remove(emptySegments.size() 
- 1);
-
-               this.memorySource = emptySegments;
-               this.fullSegments = fullSegmentTarget;
-               this.fullSegments.add(getCurrentSegment());
-       }
-
-
-       public void reset() {
-               if (this.fullSegments.size() != 0) {
-                       throw new IllegalStateException("The target list still 
contains memory segments.");
-               }
-
-               clear();
-               try {
-                       advance();
-               }
-               catch (IOException ioex) {
-                       throw new RuntimeException("Error getting first segment 
for record collector.", ioex);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                  Page Management
-       // 
--------------------------------------------------------------------------------------------
-
-       public PureOffHeapMemorySegment nextSegment(PureOffHeapMemorySegment 
current, int positionInCurrent) throws EOFException {
-               int size = this.memorySource.size();
-               if (size > 0) {
-                       final PureOffHeapMemorySegment next = 
this.memorySource.remove(size - 1);
-                       this.fullSegments.add(next);
-                       return next;
-               } else {
-                       throw new EOFException();
-               }
-       }
-       
-       public PureOffHeapMemorySegment getCurrentSegment() {
-               return this.currentSegment;
-       }
-
-       public int getCurrentPositionInSegment() {
-               return this.positionInSegment;
-       }
-       
-       public int getSegmentSize() {
-               return this.segmentSize;
-       }
-       
-       protected void advance() throws IOException {
-               this.currentSegment = nextSegment(this.currentSegment, 
this.positionInSegment);
-               this.positionInSegment = 0;
-       }
-       
-       protected void seekOutput(PureOffHeapMemorySegment seg, int position) {
-               this.currentSegment = seg;
-               this.positionInSegment = position;
-       }
-
-       protected void clear() {
-               this.currentSegment = null;
-               this.positionInSegment = 0;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                               Data Output Specific methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void write(int b) throws IOException {
-               writeByte(b);
-       }
-
-       @Override
-       public void write(byte[] b) throws IOException {
-               write(b, 0, b.length);
-       }
-
-       @Override
-       public void write(byte[] b, int off, int len) throws IOException {
-               int remaining = this.segmentSize - this.positionInSegment;
-               if (remaining >= len) {
-                       this.currentSegment.put(this.positionInSegment, b, off, 
len);
-                       this.positionInSegment += len;
-               }
-               else {
-                       if (remaining == 0) {
-                               advance();
-                               remaining = this.segmentSize - 
this.positionInSegment;
-                       }
-                       while (true) {
-                               int toPut = Math.min(remaining, len);
-                               this.currentSegment.put(this.positionInSegment, 
b, off, toPut);
-                               off += toPut;
-                               len -= toPut;
-
-                               if (len > 0) {
-                                       this.positionInSegment = 
this.segmentSize;
-                                       advance();
-                                       remaining = this.segmentSize - 
this.positionInSegment;
-                               }
-                               else {
-                                       this.positionInSegment += toPut;
-                                       break;
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void writeBoolean(boolean v) throws IOException {
-               writeByte(v ? 1 : 0);
-       }
-
-       @Override
-       public void writeByte(int v) throws IOException {
-               if (this.positionInSegment < this.segmentSize) {
-                       this.currentSegment.put(this.positionInSegment++, 
(byte) v);
-               }
-               else {
-                       advance();
-                       writeByte(v);
-               }
-       }
-
-       @Override
-       public void writeShort(int v) throws IOException {
-               if (this.positionInSegment < this.segmentSize - 1) {
-                       
this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
-                       this.positionInSegment += 2;
-               }
-               else if (this.positionInSegment == this.segmentSize) {
-                       advance();
-                       writeShort(v);
-               }
-               else {
-                       writeByte(v >> 8);
-                       writeByte(v);
-               }
-       }
-
-       @Override
-       public void writeChar(int v) throws IOException {
-               if (this.positionInSegment < this.segmentSize - 1) {
-                       
this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
-                       this.positionInSegment += 2;
-               }
-               else if (this.positionInSegment == this.segmentSize) {
-                       advance();
-                       writeChar(v);
-               }
-               else {
-                       writeByte(v >> 8);
-                       writeByte(v);
-               }
-       }
-
-       @Override
-       public void writeInt(int v) throws IOException {
-               if (this.positionInSegment < this.segmentSize - 3) {
-                       
this.currentSegment.putIntBigEndian(this.positionInSegment, v);
-                       this.positionInSegment += 4;
-               }
-               else if (this.positionInSegment == this.segmentSize) {
-                       advance();
-                       writeInt(v);
-               }
-               else {
-                       writeByte(v >> 24);
-                       writeByte(v >> 16);
-                       writeByte(v >>  8);
-                       writeByte(v);
-               }
-       }
-
-       @Override
-       public void writeLong(long v) throws IOException {
-               if (this.positionInSegment < this.segmentSize - 7) {
-                       
this.currentSegment.putLongBigEndian(this.positionInSegment, v);
-                       this.positionInSegment += 8;
-               }
-               else if (this.positionInSegment == this.segmentSize) {
-                       advance();
-                       writeLong(v);
-               }
-               else {
-                       writeByte((int) (v >> 56));
-                       writeByte((int) (v >> 48));
-                       writeByte((int) (v >> 40));
-                       writeByte((int) (v >> 32));
-                       writeByte((int) (v >> 24));
-                       writeByte((int) (v >> 16));
-                       writeByte((int) (v >>  8));
-                       writeByte((int) v);
-               }
-       }
-
-       @Override
-       public void writeFloat(float v) throws IOException {
-               writeInt(Float.floatToRawIntBits(v));
-       }
-
-       @Override
-       public void writeDouble(double v) throws IOException {
-               writeLong(Double.doubleToRawLongBits(v));
-       }
-
-       @Override
-       public void writeBytes(String s) throws IOException {
-               for (int i = 0; i < s.length(); i++) {
-                       writeByte(s.charAt(i));
-               }
-       }
-
-       @Override
-       public void writeChars(String s) throws IOException {
-               for (int i = 0; i < s.length(); i++) {
-                       writeChar(s.charAt(i));
-               }
-       }
-
-       @Override
-       public void writeUTF(String str) throws IOException {
-               int strlen = str.length();
-               int utflen = 0;
-               int c, count = 0;
-
-               /* use charAt instead of copying String to char array */
-               for (int i = 0; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if ((c >= 0x0001) && (c <= 0x007F)) {
-                               utflen++;
-                       } else if (c > 0x07FF) {
-                               utflen += 3;
-                       } else {
-                               utflen += 2;
-                       }
-               }
-
-               if (utflen > 65535) {
-                       throw new UTFDataFormatException("encoded string too 
long: " + utflen + " memory");
-               }
-
-               if (this.utfBuffer == null || this.utfBuffer.length < utflen + 
2) {
-                       this.utfBuffer = new byte[utflen + 2];
-               }
-               final byte[] bytearr = this.utfBuffer;
-
-               bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-               bytearr[count++] = (byte) (utflen & 0xFF);
-
-               int i = 0;
-               for (i = 0; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if (!((c >= 0x0001) && (c <= 0x007F))) {
-                               break;
-                       }
-                       bytearr[count++] = (byte) c;
-               }
-
-               for (; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if ((c >= 0x0001) && (c <= 0x007F)) {
-                               bytearr[count++] = (byte) c;
-
-                       } else if (c > 0x07FF) {
-                               bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
-                               bytearr[count++] = (byte) (0x80 | ((c >> 6) & 
0x3F));
-                               bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-                       } else {
-                               bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
-                               bytearr[count++] = (byte) (0x80 | (c & 0x3F));
-                       }
-               }
-
-               write(bytearr, 0, utflen + 2);
-       }
-
-       @Override
-       public void skipBytesToWrite(int numBytes) throws IOException {
-               while (numBytes > 0) {
-                       final int remaining = this.segmentSize - 
this.positionInSegment;
-                       if (numBytes <= remaining) {
-                               this.positionInSegment += numBytes;
-                               return;
-                       }
-                       this.positionInSegment = this.segmentSize;
-                       advance();
-                       numBytes -= remaining;
-               }
-       }
-
-       @Override
-       public void write(DataInputView source, int numBytes) throws 
IOException {
-               while (numBytes > 0) {
-                       final int remaining = this.segmentSize - 
this.positionInSegment;
-                       if (numBytes <= remaining) {
-                               this.currentSegment.put(source, 
this.positionInSegment, numBytes);
-                               this.positionInSegment += numBytes;
-                               return;
-                       }
-
-                       if (remaining > 0) {
-                               this.currentSegment.put(source, 
this.positionInSegment, remaining);
-                               this.positionInSegment = this.segmentSize;
-                               numBytes -= remaining;
-                       }
-
-                       advance();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a52574/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/StringSerializationSpeedBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/StringSerializationSpeedBenchmark.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/StringSerializationSpeedBenchmark.java
deleted file mode 100644
index 2163eb5..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/core/memory/benchmarks/StringSerializationSpeedBenchmark.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory.benchmarks;
-
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Random;
-
-public class StringSerializationSpeedBenchmark {
-       
-       public static void main(String[] args) throws Exception {
-               
-               final int LARGE_SEGMENT_SIZE = 1024 * 1024 * 1024;
-
-               final byte[] largeSegment = new byte[LARGE_SEGMENT_SIZE];
-
-               final ByteBuffer largeOffHeap = 
ByteBuffer.allocateDirect(LARGE_SEGMENT_SIZE);
-
-               final String[] randomStrings = 
generateRandomStrings(5468917685263896L, 1000, 128, 6, true);
-
-               final StringSerializer ser = StringSerializer.INSTANCE;
-               
-               final int outerRounds = 10;
-               final int innerRounds = 5000;
-
-               {
-                       System.out.println("testing core heap memory segment");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-
-                               ArrayList<MemorySegment> memory = new 
ArrayList<>();
-                               
memory.add(HeapMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, null));
-                               ArrayList<MemorySegment> target = new 
ArrayList<>();
-
-                               CoreMemorySegmentOutView output = new 
CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Core heap memory 
segment took %,d msecs", (stop - start) / 1000000));
-               }
-
-               {
-                       System.out.println("testing core hybrid memory segment 
on heap");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-
-                               ArrayList<MemorySegment> memory = new 
ArrayList<>();
-                               
memory.add(HybridMemorySegment.FACTORY.wrapPooledHeapMemory(largeSegment, 
null));
-                               ArrayList<MemorySegment> target = new 
ArrayList<>();
-
-                               CoreMemorySegmentOutView output = new 
CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Core hybrid memory 
segment on heap took %,d msecs", (stop - start) / 1000000));
-               }
-
-               {
-                       System.out.println("testing core hybrid memory segment 
off heap");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-
-                               ArrayList<MemorySegment> memory = new 
ArrayList<>();
-                               
memory.add(HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(largeOffHeap, 
null));
-                               ArrayList<MemorySegment> target = new 
ArrayList<>();
-
-                               CoreMemorySegmentOutView output = new 
CoreMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Core hybrid memory 
segment off heap took %,d msecs", (stop - start) / 1000000));
-               }
-               
-               {
-                       System.out.println("testing pure hybrid memory segment 
on heap");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-                       
-                               ArrayList<PureHybridMemorySegment> memory = new 
ArrayList<>();
-                               memory.add(new 
PureHybridMemorySegment(largeSegment));
-                               ArrayList<PureHybridMemorySegment> target = new 
ArrayList<>();
-       
-                               PureHybridMemorySegmentOutView output = new 
PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-                                       
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Pure hybrid on heap 
memory segment took %,d msecs", (stop - start) / 1000000));
-               }
-
-               {
-                       System.out.println("testing pure hybrid memory segment 
off heap");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-                               
-                               ArrayList<PureHybridMemorySegment> memory = new 
ArrayList<>();
-                               memory.add(new 
PureHybridMemorySegment(largeOffHeap));
-                               ArrayList<PureHybridMemorySegment> target = new 
ArrayList<>();
-       
-                               PureHybridMemorySegmentOutView output = new 
PureHybridMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-                               
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Pure hybrid off heap 
memory segment took %,d msecs", (stop - start) / 1000000));
-               }
-               
-               {
-                       System.out.println("testing pure heap memory segment");
-
-                       long start = System.nanoTime();
-                       for (int outer = 0; outer < outerRounds; outer++) {
-                                       
-                               ArrayList<PureHeapMemorySegment> memory = new 
ArrayList<>();
-                               memory.add(new 
PureHeapMemorySegment(largeSegment));
-                               ArrayList<PureHeapMemorySegment> target = new 
ArrayList<>();
-                               
-                               PureHeapMemorySegmentOutView output = new 
PureHeapMemorySegmentOutView(memory, target, LARGE_SEGMENT_SIZE);
-                               
-                               for (int i = 0; i < innerRounds; i++) {
-                                       for (String s : randomStrings) {
-                                               ser.serialize(s, output);
-                                       }
-                               }
-                       }
-                       long stop = System.nanoTime();
-
-                       System.out.println(String.format("Pure heap memory 
segment took %,d msecs", (stop - start) / 1000000));
-               }
-       }
-       
-       private static String[] generateRandomStrings(long seed, int num, int 
maxLen, int minLen, boolean asciiOnly) {
-               Random rnd = new Random(seed);
-               String[] array = new String[num];
-               StringBuilder bld = new StringBuilder(maxLen);
-               
-               int minCharValue = 40;
-               int charRange = asciiOnly ? 60 : 30000;
-               
-               for (int i = 0; i < num; i++) {
-                       bld.setLength(0);
-                       int len = rnd.nextInt(maxLen - minLen) + minLen;
-                       
-                       for (int k = 0; k < len; k++) {
-                               bld.append((char) (rnd.nextInt(charRange) + 
minCharValue));
-                       }
-                       
-                       array[i] = bld.toString();
-               }
-               
-               return array;
-       }
-}

Reply via email to