This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39f5f1b0f09c37400ba113fdf33f90a832de5f0d Author: Roman Khachatryan <[email protected]> AuthorDate: Wed May 6 17:54:05 2020 +0200 [FLINK-17547][task][hotfix] Extract NonSpanningWrapper from SpillingAdaptiveSpanningRecordDeserializer (static inner class) As it is, no logical changes. --- .../api/serialization/NonSpanningWrapper.java | 296 +++++++++++++++++++++ ...SpillingAdaptiveSpanningRecordDeserializer.java | 271 ------------------- 2 files changed, 296 insertions(+), 271 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java new file mode 100644 index 0000000..bab50fa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.serialization; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.util.Optional; + +final class NonSpanningWrapper implements DataInputView { + + MemorySegment segment; + + private int limit; + + int position; + + private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding + private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding + + int remaining() { + return this.limit - this.position; + } + + void clear() { + this.segment = null; + this.limit = 0; + this.position = 0; + } + + void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) { + this.segment = seg; + this.position = position; + this.limit = leftOverLimit; + } + + Optional<MemorySegment> getUnconsumedSegment() { + if (remaining() == 0) { + return Optional.empty(); + } + MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining()); + segment.copyTo(position, target, 0, remaining()); + return Optional.of(target); + } + + // ------------------------------------------------------------------------------------------------------------- + // DataInput specific methods + // ------------------------------------------------------------------------------------------------------------- + + @Override + public final void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public final void readFully(byte[] b, int off, int len) throws IOException { + if (off < 0 || len < 0 || off + len > b.length) { + throw new IndexOutOfBoundsException(); + } + + this.segment.get(this.position, b, off, len); + this.position += len; + } + + @Override + public final boolean readBoolean() throws IOException { + return readByte() == 1; + } + + @Override + public final byte readByte() throws IOException { + return this.segment.get(this.position++); + } + + @Override + public final int readUnsignedByte() throws IOException { + return readByte() & 0xff; + } + + @Override + public final short readShort() throws IOException { + final short v = this.segment.getShortBigEndian(this.position); + this.position += 2; + return v; + } + + @Override + public final int readUnsignedShort() throws IOException { + final int v = this.segment.getShortBigEndian(this.position) & 0xffff; + this.position += 2; + return v; + } + + @Override + public final char readChar() throws IOException { + final char v = this.segment.getCharBigEndian(this.position); + this.position += 2; + return v; + } + + @Override + public final int readInt() throws IOException { + final int v = this.segment.getIntBigEndian(this.position); + this.position += 4; + return v; + } + + @Override + public final long readLong() throws IOException { + final long v = this.segment.getLongBigEndian(this.position); + this.position += 8; + return v; + } + + @Override + public final float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + @Override + public final double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + @Override + public final String readLine() throws IOException { + final StringBuilder bld = new StringBuilder(32); + + try { + int b; + while ((b = readUnsignedByte()) != '\n') { + if (b != '\r') { + bld.append((char) b); + } + } + } + catch (EOFException ignored) {} + + if (bld.length() == 0) { + return null; + } + + // trim a trailing carriage return + int len = bld.length(); + if (len > 0 && bld.charAt(len - 1) == '\r') { + bld.setLength(len - 1); + } + return bld.toString(); + } + + @Override + public final String readUTF() throws IOException { + final int utflen = readUnsignedShort(); + + final byte[] bytearr; + final char[] chararr; + + if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) { + bytearr = new byte[utflen]; + this.utfByteBuffer = bytearr; + } else { + bytearr = this.utfByteBuffer; + } + if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) { + chararr = new char[utflen]; + this.utfCharBuffer = chararr; + } else { + chararr = this.utfCharBuffer; + } + + int c, char2, char3; + int count = 0; + int chararrCount = 0; + + readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + count++; + chararr[chararrCount++] = (char) c; + break; + case 12: + case 13: + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); + break; + default: + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } + + @Override + public final int skipBytes(int n) throws IOException { + if (n < 0) { + throw new IllegalArgumentException(); + } + + int toSkip = Math.min(n, remaining()); + this.position += toSkip; + return toSkip; + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + int skippedBytes = skipBytes(numBytes); + + if (skippedBytes < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null){ + throw new NullPointerException("Byte array b cannot be null."); + } + + if (off < 0){ + throw new IllegalArgumentException("The offset off cannot be negative."); + } + + if (len < 0){ + throw new IllegalArgumentException("The length len cannot be negative."); + } + + int toRead = Math.min(len, remaining()); + this.segment.get(this.position, b, off, toRead); + this.position += toRead; + + return toRead; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 346bdfc..5003e78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -32,12 +32,10 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; import java.io.BufferedInputStream; -import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; -import java.io.UTFDataFormatException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; @@ -184,275 +182,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit // ----------------------------------------------------------------------------------------------------------------- - private static final class NonSpanningWrapper implements DataInputView { - - private MemorySegment segment; - - private int limit; - - private int position; - - private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding - private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding - - int remaining() { - return this.limit - this.position; - } - - void clear() { - this.segment = null; - this.limit = 0; - this.position = 0; - } - - void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) { - this.segment = seg; - this.position = position; - this.limit = leftOverLimit; - } - - Optional<MemorySegment> getUnconsumedSegment() { - if (remaining() == 0) { - return Optional.empty(); - } - MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining()); - segment.copyTo(position, target, 0, remaining()); - return Optional.of(target); - } - - // ------------------------------------------------------------------------------------------------------------- - // DataInput specific methods - // ------------------------------------------------------------------------------------------------------------- - - @Override - public final void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - @Override - public final void readFully(byte[] b, int off, int len) throws IOException { - if (off < 0 || len < 0 || off + len > b.length) { - throw new IndexOutOfBoundsException(); - } - - this.segment.get(this.position, b, off, len); - this.position += len; - } - - @Override - public final boolean readBoolean() throws IOException { - return readByte() == 1; - } - - @Override - public final byte readByte() throws IOException { - return this.segment.get(this.position++); - } - - @Override - public final int readUnsignedByte() throws IOException { - return readByte() & 0xff; - } - - @Override - public final short readShort() throws IOException { - final short v = this.segment.getShortBigEndian(this.position); - this.position += 2; - return v; - } - - @Override - public final int readUnsignedShort() throws IOException { - final int v = this.segment.getShortBigEndian(this.position) & 0xffff; - this.position += 2; - return v; - } - - @Override - public final char readChar() throws IOException { - final char v = this.segment.getCharBigEndian(this.position); - this.position += 2; - return v; - } - - @Override - public final int readInt() throws IOException { - final int v = this.segment.getIntBigEndian(this.position); - this.position += 4; - return v; - } - - @Override - public final long readLong() throws IOException { - final long v = this.segment.getLongBigEndian(this.position); - this.position += 8; - return v; - } - - @Override - public final float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - @Override - public final double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - @Override - public final String readLine() throws IOException { - final StringBuilder bld = new StringBuilder(32); - - try { - int b; - while ((b = readUnsignedByte()) != '\n') { - if (b != '\r') { - bld.append((char) b); - } - } - } - catch (EOFException ignored) {} - - if (bld.length() == 0) { - return null; - } - - // trim a trailing carriage return - int len = bld.length(); - if (len > 0 && bld.charAt(len - 1) == '\r') { - bld.setLength(len - 1); - } - return bld.toString(); - } - - @Override - public final String readUTF() throws IOException { - final int utflen = readUnsignedShort(); - - final byte[] bytearr; - final char[] chararr; - - if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) { - bytearr = new byte[utflen]; - this.utfByteBuffer = bytearr; - } else { - bytearr = this.utfByteBuffer; - } - if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) { - chararr = new char[utflen]; - this.utfCharBuffer = chararr; - } else { - chararr = this.utfCharBuffer; - } - - int c, char2, char3; - int count = 0; - int chararrCount = 0; - - readFully(bytearr, 0, utflen); - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) { - break; - } - count++; - chararr[chararrCount++] = (char) c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - count++; - chararr[chararrCount++] = (char) c; - break; - case 12: - case 13: - count += 2; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 1]; - if ((char2 & 0xC0) != 0x80) { - throw new UTFDataFormatException("malformed input around byte " + count); - } - chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); - break; - case 14: - count += 3; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 2]; - char3 = (int) bytearr[count - 1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { - throw new UTFDataFormatException("malformed input around byte " + (count - 1)); - } - chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); - break; - default: - throw new UTFDataFormatException("malformed input around byte " + count); - } - } - // The number of chars produced may be less than utflen - return new String(chararr, 0, chararrCount); - } - - @Override - public final int skipBytes(int n) throws IOException { - if (n < 0) { - throw new IllegalArgumentException(); - } - - int toSkip = Math.min(n, remaining()); - this.position += toSkip; - return toSkip; - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - int skippedBytes = skipBytes(numBytes); - - if (skippedBytes < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null){ - throw new NullPointerException("Byte array b cannot be null."); - } - - if (off < 0){ - throw new IllegalArgumentException("The offset off cannot be negative."); - } - - if (len < 0){ - throw new IllegalArgumentException("The length len cannot be negative."); - } - - int toRead = Math.min(len, remaining()); - this.segment.get(this.position, b, off, toRead); - this.position += toRead; - - return toRead; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - } - // ----------------------------------------------------------------------------------------------------------------- private static final class SpanningWrapper {
