This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ebdaf40973e1b7ff6aaadd19d1d6cda1d3e69d8
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue May 12 11:24:01 2020 +0200

    [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer
---
 .../main/java/org/apache/flink/util/IOUtils.java   |   9 +
 .../api/serialization/NonSpanningWrapper.java      |  81 +++++-
 .../network/api/serialization/SpanningWrapper.java | 278 ++++++++++-----------
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 121 ++++-----
 4 files changed, 271 insertions(+), 218 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
index 02b11e6..1f9af18 100644
--- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.Socket;
 
+import static java.util.Arrays.asList;
+
 /**
  * An utility class for I/O related functionality.
  */
@@ -244,6 +246,13 @@ public final class IOUtils {
        /**
         * Closes all elements in the iterable with closeQuietly().
         */
+       public static void closeAllQuietly(AutoCloseable... closeables) {
+               closeAllQuietly(asList(closeables));
+       }
+
+       /**
+        * Closes all elements in the iterable with closeQuietly().
+        */
        public static void closeAllQuietly(Iterable<? extends AutoCloseable> 
closeables) {
                if (null != closeables) {
                        for (AutoCloseable closeable : closeables) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 6d9602f..5de5467 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -17,27 +17,43 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+
 final class NonSpanningWrapper implements DataInputView {
 
-       MemorySegment segment;
+       private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
+                       "Serializer consumed more bytes than the record had. " +
+                                       "This indicates broken serialization. 
If you are using custom serialization types " +
+                                       "(Value or Writable), check their 
serialization methods. If you are using a " +
+                                       "Kryo-serialized type, check the 
corresponding Kryo serializer.";
+
+       private MemorySegment segment;
 
        private int limit;
 
-       int position;
+       private int position;
 
        private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
        private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
 
-       int remaining() {
+       private final NextRecordResponse reusedNextRecordResponse = new 
NextRecordResponse(null, 0); // performance impact of immutable objects not 
benchmarked
+
+       private int remaining() {
                return this.limit - this.position;
        }
 
@@ -47,14 +63,14 @@ final class NonSpanningWrapper implements DataInputView {
                this.position = 0;
        }
 
-       void initializeFromMemorySegment(MemorySegment seg, int position, int 
leftOverLimit) {
+       void initializeFromMemorySegment(MemorySegment seg, int position, int 
limit) {
                this.segment = seg;
                this.position = position;
-               this.limit = leftOverLimit;
+               this.limit = limit;
        }
 
        Optional<MemorySegment> getUnconsumedSegment() {
-               if (remaining() == 0) {
+               if (!hasRemaining()) {
                        return Optional.empty();
                }
                MemorySegment target = 
MemorySegmentFactory.allocateUnpooledSegment(remaining());
@@ -62,6 +78,10 @@ final class NonSpanningWrapper implements DataInputView {
                return Optional.of(target);
        }
 
+       boolean hasRemaining() {
+               return remaining() > 0;
+       }
+
        // 
-------------------------------------------------------------------------------------------------------------
        //                                       DataInput specific methods
        // 
-------------------------------------------------------------------------------------------------------------
@@ -290,4 +310,53 @@ final class NonSpanningWrapper implements DataInputView {
        public int read(byte[] b) {
                return read(b, 0, b.length);
        }
+
+       ByteBuffer wrapIntoByteBuffer() {
+               return segment.wrap(position, remaining());
+       }
+
+       int copyContentTo(byte[] dst) {
+               final int numBytesChunk = remaining();
+               segment.get(position, dst, 0, numBytesChunk);
+               return numBytesChunk;
+       }
+
+       /**
+        * Copies the data and transfers the "ownership" (i.e. clears current 
wrapper).
+        */
+       void transferTo(ByteBuffer dst) {
+               segment.get(position, dst, remaining());
+               clear();
+       }
+
+       NextRecordResponse getNextRecord(IOReadableWritable target) throws 
IOException {
+               int recordLen = readInt();
+               if (canReadRecord(recordLen)) {
+                       return readInto(target);
+               } else {
+                       return reusedNextRecordResponse.updated(PARTIAL_RECORD, 
recordLen);
+               }
+       }
+
+       private NextRecordResponse readInto(IOReadableWritable target) throws 
IOException {
+               try {
+                       target.read(this);
+               } catch (IndexOutOfBoundsException e) {
+                       throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
+               }
+               int remaining = remaining();
+               if (remaining < 0) {
+                       throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new 
IndexOutOfBoundsException("Remaining = " + remaining));
+               }
+               return reusedNextRecordResponse.updated(remaining == 0 ? 
LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining);
+       }
+
+       boolean hasCompleteLength() {
+               return remaining() >= LENGTH_BYTES;
+       }
+
+       private boolean canReadRecord(int recordLength) {
+               return recordLength <= remaining();
+       }
+
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index e59363f..430f0db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -38,9 +37,16 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.Random;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
+import static org.apache.flink.util.FileUtils.writeCompletely;
+import static org.apache.flink.util.IOUtils.closeAllQuietly;
+
 final class SpanningWrapper {
 
        private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 
MiBytes
+       private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024;
 
        private final byte[] initialBuffer = new byte[1024];
 
@@ -50,7 +56,7 @@ final class SpanningWrapper {
 
        private final DataInputDeserializer serializationReadBuffer;
 
-       private final ByteBuffer lengthBuffer;
+       final ByteBuffer lengthBuffer;
 
        private FileChannel spillingChannel;
 
@@ -70,10 +76,10 @@ final class SpanningWrapper {
 
        private DataInputViewStreamWrapper spillFileReader;
 
-       public SpanningWrapper(String[] tempDirs) {
+       SpanningWrapper(String[] tempDirs) {
                this.tempDirs = tempDirs;
 
-               this.lengthBuffer = ByteBuffer.allocate(4);
+               this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES);
                this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
                this.recordLength = -1;
@@ -82,187 +88,161 @@ final class SpanningWrapper {
                this.buffer = initialBuffer;
        }
 
-       void initializeWithPartialRecord(NonSpanningWrapper partial, int 
nextRecordLength) throws IOException {
-               // set the length and copy what is available to the buffer
-               this.recordLength = nextRecordLength;
-
-               final int numBytesChunk = partial.remaining();
-
-               if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-                       // create a spilling channel and put the data there
-                       this.spillingChannel = createSpillingChannel();
-
-                       ByteBuffer toWrite = 
partial.segment.wrap(partial.position, numBytesChunk);
-                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
-               }
-               else {
-                       // collect in memory
-                       ensureBufferCapacity(nextRecordLength);
-                       partial.segment.get(partial.position, buffer, 0, 
numBytesChunk);
-               }
-
-               this.accumulatedRecordBytes = numBytesChunk;
+       /**
+        * Copies the data and transfers the "ownership" (i.e. clears the 
passed wrapper).
+        */
+       void transferFrom(NonSpanningWrapper partial, int nextRecordLength) 
throws IOException {
+               updateLength(nextRecordLength);
+               accumulatedRecordBytes = isAboveSpillingThreshold() ? 
spill(partial) : partial.copyContentTo(buffer);
+               partial.clear();
        }
 
-       void initializeWithPartialLength(NonSpanningWrapper partial) throws 
IOException {
-               // copy what we have to the length buffer
-               partial.segment.get(partial.position, this.lengthBuffer, 
partial.remaining());
+       private boolean isAboveSpillingThreshold() {
+               return recordLength > THRESHOLD_FOR_SPILLING;
        }
 
        void addNextChunkFromMemorySegment(MemorySegment segment, int offset, 
int numBytes) throws IOException {
-               int segmentPosition = offset;
-               int segmentRemaining = numBytes;
-               // check where to go. if we have a partial length, we need to 
complete it first
-               if (this.lengthBuffer.position() > 0) {
-                       int toPut = Math.min(this.lengthBuffer.remaining(), 
segmentRemaining);
-                       segment.get(segmentPosition, this.lengthBuffer, toPut);
-                       // did we complete the length?
-                       if (this.lengthBuffer.hasRemaining()) {
-                               return;
-                       } else {
-                               this.recordLength = this.lengthBuffer.getInt(0);
-
-                               this.lengthBuffer.clear();
-                               segmentPosition += toPut;
-                               segmentRemaining -= toPut;
-                               if (this.recordLength > THRESHOLD_FOR_SPILLING) 
{
-                                       this.spillingChannel = 
createSpillingChannel();
-                               } else {
-                                       ensureBufferCapacity(this.recordLength);
-                               }
-                       }
+               int limit = offset + numBytes;
+               int numBytesRead = isReadingLength() ? readLength(segment, 
offset, numBytes) : 0;
+               offset += numBytesRead;
+               numBytes -= numBytesRead;
+               if (numBytes == 0) {
+                       return;
                }
 
-               // copy as much as we need or can for this next spanning record
-               int needed = this.recordLength - this.accumulatedRecordBytes;
-               int toCopy = Math.min(needed, segmentRemaining);
+               int toCopy = min(recordLength - accumulatedRecordBytes, 
numBytes);
+               if (toCopy > 0) {
+                       copyFromSegment(segment, offset, toCopy);
+               }
+               if (numBytes > toCopy) {
+                       leftOverData = segment;
+                       leftOverStart = offset + toCopy;
+                       leftOverLimit = limit;
+               }
+       }
 
-               if (spillingChannel != null) {
-                       // spill to file
-                       ByteBuffer toWrite = segment.wrap(segmentPosition, 
toCopy);
-                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
+       private void copyFromSegment(MemorySegment segment, int offset, int 
length) throws IOException {
+               if (spillingChannel == null) {
+                       copyIntoBuffer(segment, offset, length);
                } else {
-                       segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
+                       copyIntoFile(segment, offset, length);
                }
+       }
 
-               this.accumulatedRecordBytes += toCopy;
-
-               if (toCopy < segmentRemaining) {
-                       // there is more data in the segment
-                       this.leftOverData = segment;
-                       this.leftOverStart = segmentPosition + toCopy;
-                       this.leftOverLimit = numBytes + offset;
+       private void copyIntoFile(MemorySegment segment, int offset, int 
length) throws IOException {
+               writeCompletely(spillingChannel, segment.wrap(offset, length));
+               accumulatedRecordBytes += length;
+               if (hasFullRecord()) {
+                       spillingChannel.close();
+                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
                }
+       }
 
-               if (accumulatedRecordBytes == recordLength) {
-                       // we have the full record
-                       if (spillingChannel == null) {
-                               this.serializationReadBuffer.setBuffer(buffer, 
0, recordLength);
-                       }
-                       else {
-                               spillingChannel.close();
+       private void copyIntoBuffer(MemorySegment segment, int offset, int 
length) {
+               segment.get(offset, buffer, accumulatedRecordBytes, length);
+               accumulatedRecordBytes += length;
+               if (hasFullRecord()) {
+                       serializationReadBuffer.setBuffer(buffer, 0, 
recordLength);
+               }
+       }
 
-                               BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-                               this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
-                       }
+       private int readLength(MemorySegment segment, int segmentPosition, int 
segmentRemaining) throws IOException {
+               int bytesToRead = min(lengthBuffer.remaining(), 
segmentRemaining);
+               segment.get(segmentPosition, lengthBuffer, bytesToRead);
+               if (!lengthBuffer.hasRemaining()) {
+                       updateLength(lengthBuffer.getInt(0));
                }
+               return bytesToRead;
        }
 
-       Optional<MemorySegment> getUnconsumedSegment() throws IOException {
-               // for the case of only partial length, no data
-               final int position = lengthBuffer.position();
-               if (position > 0) {
-                       MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
-                       lengthBuffer.position(0);
-                       segment.put(0, lengthBuffer, position);
-                       return Optional.of(segment);
+       private void updateLength(int length) throws IOException {
+               lengthBuffer.clear();
+               recordLength = length;
+               if (isAboveSpillingThreshold()) {
+                       spillingChannel = createSpillingChannel();
+               } else {
+                       ensureBufferCapacity(length);
                }
+       }
 
-               // for the case of full length, partial data in buffer
-               if (recordLength > THRESHOLD_FOR_SPILLING) {
-                       throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled " +
-                               "records.");
-               } else if (recordLength != -1) {
-                       int leftOverSize = leftOverLimit - leftOverStart;
-                       int unconsumedSize = Integer.BYTES + 
accumulatedRecordBytes + leftOverSize;
-                       DataOutputSerializer serializer = new 
DataOutputSerializer(unconsumedSize);
-                       serializer.writeInt(recordLength);
-                       serializer.write(buffer, 0, accumulatedRecordBytes);
-                       if (leftOverData != null) {
-                               serializer.write(leftOverData, leftOverStart, 
leftOverSize);
-                       }
-                       MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
-                       segment.put(0, serializer.getSharedBuffer(), 0, 
segment.size());
-                       return Optional.of(segment);
+       Optional<MemorySegment> getUnconsumedSegment() throws IOException {
+               if (isReadingLength()) {
+                       return Optional.of(copyLengthBuffer());
+               } else if (isAboveSpillingThreshold()) {
+                       throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled records.");
+               } else if (recordLength == -1) {
+                       return Optional.empty(); // no remaining partial length 
or data
+               } else {
+                       return Optional.of(copyDataBuffer());
                }
+       }
 
-               // for the case of no remaining partial length or data
-               return Optional.empty();
+       private MemorySegment copyLengthBuffer() {
+               int position = lengthBuffer.position();
+               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
+               lengthBuffer.position(0);
+               segment.put(0, lengthBuffer, position);
+               return segment;
        }
 
-       void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper 
deserializer) {
-               deserializer.clear();
+       private MemorySegment copyDataBuffer() throws IOException {
+               int leftOverSize = leftOverLimit - leftOverStart;
+               int unconsumedSize = LENGTH_BYTES + accumulatedRecordBytes + 
leftOverSize;
+               DataOutputSerializer serializer = new 
DataOutputSerializer(unconsumedSize);
+               serializer.writeInt(recordLength);
+               serializer.write(buffer, 0, accumulatedRecordBytes);
+               if (leftOverData != null) {
+                       serializer.write(leftOverData, leftOverStart, 
leftOverSize);
+               }
+               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+               segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
+               return segment;
+       }
 
+       /**
+        * Copies the leftover data and transfers the "ownership" (i.e. clears 
this wrapper).
+        */
+       void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) {
+               nonSpanningWrapper.clear();
                if (leftOverData != null) {
-                       deserializer.initializeFromMemorySegment(leftOverData, 
leftOverStart, leftOverLimit);
+                       
nonSpanningWrapper.initializeFromMemorySegment(leftOverData, leftOverStart, 
leftOverLimit);
                }
+               clear();
        }
 
        boolean hasFullRecord() {
-               return this.recordLength >= 0 && this.accumulatedRecordBytes >= 
this.recordLength;
+               return recordLength >= 0 && accumulatedRecordBytes >= 
recordLength;
        }
 
        int getNumGatheredBytes() {
-               return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 
4 : lengthBuffer.position());
+               return accumulatedRecordBytes + (recordLength >= 0 ? 
LENGTH_BYTES : lengthBuffer.position());
        }
 
+       @SuppressWarnings("ResultOfMethodCallIgnored")
        public void clear() {
-               this.buffer = initialBuffer;
-               this.serializationReadBuffer.releaseArrays();
-
-               this.recordLength = -1;
-               this.lengthBuffer.clear();
-               this.leftOverData = null;
-               this.leftOverStart = 0;
-               this.leftOverLimit = 0;
-               this.accumulatedRecordBytes = 0;
-
-               if (spillingChannel != null) {
-                       try {
-                               spillingChannel.close();
-                       }
-                       catch (Throwable t) {
-                               // ignore
-                       }
-                       spillingChannel = null;
-               }
-               if (spillFileReader != null) {
-                       try {
-                               spillFileReader.close();
-                       }
-                       catch (Throwable t) {
-                               // ignore
-                       }
-                       spillFileReader = null;
-               }
-               if (spillFile != null) {
-                       spillFile.delete();
-                       spillFile = null;
-               }
+               buffer = initialBuffer;
+               serializationReadBuffer.releaseArrays();
+
+               recordLength = -1;
+               lengthBuffer.clear();
+               leftOverData = null;
+               leftOverStart = 0;
+               leftOverLimit = 0;
+               accumulatedRecordBytes = 0;
+
+               closeAllQuietly(spillingChannel, spillFileReader, () -> 
spillFile.delete());
+               spillingChannel = null;
+               spillFileReader = null;
+               spillFile = null;
        }
 
        public DataInputView getInputView() {
-               if (spillFileReader == null) {
-                       return serializationReadBuffer;
-               }
-               else {
-                       return spillFileReader;
-               }
+               return spillFileReader == null ? serializationReadBuffer : 
spillFileReader;
        }
 
        private void ensureBufferCapacity(int minLength) {
                if (buffer.length < minLength) {
-                       byte[] newBuffer = new byte[Math.max(minLength, 
buffer.length * 2)];
+                       byte[] newBuffer = new byte[max(minLength, 
buffer.length * 2)];
                        System.arraycopy(buffer, 0, newBuffer, 0, 
accumulatedRecordBytes);
                        buffer = newBuffer;
                }
@@ -294,4 +274,16 @@ final class SpanningWrapper {
                random.nextBytes(bytes);
                return StringUtils.byteToHexString(bytes);
        }
+
+       private int spill(NonSpanningWrapper partial) throws IOException {
+               ByteBuffer buffer = partial.wrapIntoByteBuffer();
+               int length = buffer.remaining();
+               writeCompletely(spillingChannel, buffer);
+               return length;
+       }
+
+       private boolean isReadingLength() {
+               return lengthBuffer.position() > 0;
+       }
+
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index f20fbc9..75e6b0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,19 +24,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.io.IOException;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
+import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
+import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
+
 /**
  * @param <T> The type of the record to be deserialized.
  */
 public class SpillingAdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> implements RecordDeserializer<T> {
 
-       private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
-                                       "Serializer consumed more bytes than 
the record had. " +
-                                       "This indicates broken serialization. 
If you are using custom serialization types " +
-                                       "(Value or Writable), check their 
serialization methods. If you are using a " +
-                                       "Kryo-serialized type, check the 
corresponding Kryo serializer.";
+       static final int LENGTH_BYTES = Integer.BYTES;
 
        private final NonSpanningWrapper nonSpanningWrapper;
 
@@ -58,11 +61,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                int numBytes = buffer.getSize();
 
                // check if some spanning record deserialization is pending
-               if (this.spanningWrapper.getNumGatheredBytes() > 0) {
-                       
this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
-               }
-               else {
-                       
this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + 
offset);
+               if (spanningWrapper.getNumGatheredBytes() > 0) {
+                       spanningWrapper.addNextChunkFromMemorySegment(segment, 
offset, numBytes);
+               } else {
+                       nonSpanningWrapper.initializeFromMemorySegment(segment, 
offset, numBytes + offset);
                }
        }
 
@@ -75,14 +77,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        @Override
        public Optional<Buffer> getUnconsumedBuffer() throws IOException {
-               Optional<MemorySegment> target;
-               if (nonSpanningWrapper.remaining() > 0) {
-                       target = nonSpanningWrapper.getUnconsumedSegment();
+               final Optional<MemorySegment> unconsumedSegment;
+               if (nonSpanningWrapper.hasRemaining()) {
+                       unconsumedSegment = 
nonSpanningWrapper.getUnconsumedSegment();
                } else {
-                       target = spanningWrapper.getUnconsumedSegment();
+                       unconsumedSegment = 
spanningWrapper.getUnconsumedSegment();
                }
-               return target.map(memorySegment -> new NetworkBuffer(
-                       memorySegment, FreeingBufferRecycler.INSTANCE, 
Buffer.DataType.DATA_BUFFER, memorySegment.size()));
+               return unconsumedSegment.map(segment -> new 
NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, 
segment.size()));
        }
 
        @Override
@@ -91,65 +92,31 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                // this should be the majority of the cases for small records
                // for large records, this portion of the work is very small in 
comparison anyways
 
-               int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-
-               // check if we can get a full length;
-               if (nonSpanningRemaining >= 4) {
-                       int len = this.nonSpanningWrapper.readInt();
-
-                       if (len <= nonSpanningRemaining - 4) {
-                               // we can get a full record from here
-                               try {
-                                       target.read(this.nonSpanningWrapper);
-
-                                       int remaining = 
this.nonSpanningWrapper.remaining();
-                                       if (remaining > 0) {
-                                               return 
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-                                       }
-                                       else if (remaining == 0) {
-                                               return 
DeserializationResult.LAST_RECORD_FROM_BUFFER;
-                                       }
-                                       else {
-                                               throw new 
IndexOutOfBoundsException("Remaining = " + remaining);
-                                       }
-                               }
-                               catch (IndexOutOfBoundsException e) {
-                                       throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
-                               }
-                       }
-                       else {
-                               // we got the length, but we need the rest from 
the spanning deserializer
-                               // and need to wait for more buffers
-                               
this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
-                               this.nonSpanningWrapper.clear();
-                               return DeserializationResult.PARTIAL_RECORD;
-                       }
-               } else if (nonSpanningRemaining > 0) {
-                       // we have an incomplete length
-                       // add our part of the length to the length buffer
-                       
this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
-                       this.nonSpanningWrapper.clear();
-                       return DeserializationResult.PARTIAL_RECORD;
-               }
+               if (nonSpanningWrapper.hasCompleteLength()) {
+                       return readNonSpanningRecord(target);
 
-               // spanning record case
-               if (this.spanningWrapper.hasFullRecord()) {
-                       // get the full record
-                       target.read(this.spanningWrapper.getInputView());
+               } else if (nonSpanningWrapper.hasRemaining()) {
+                       
nonSpanningWrapper.transferTo(spanningWrapper.lengthBuffer);
+                       return PARTIAL_RECORD;
 
-                       // move the remainder to the non-spanning wrapper
-                       // this does not copy it, only sets the memory segment
-                       
this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
-                       this.spanningWrapper.clear();
+               } else if (spanningWrapper.hasFullRecord()) {
+                       target.read(spanningWrapper.getInputView());
+                       spanningWrapper.transferLeftOverTo(nonSpanningWrapper);
+                       return nonSpanningWrapper.hasRemaining() ? 
INTERMEDIATE_RECORD_FROM_BUFFER : LAST_RECORD_FROM_BUFFER;
 
-                       return (this.nonSpanningWrapper.remaining() == 0) ?
-                               DeserializationResult.LAST_RECORD_FROM_BUFFER :
-                               
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
                } else {
-                       return DeserializationResult.PARTIAL_RECORD;
+                       return PARTIAL_RECORD;
                }
        }
 
+       private DeserializationResult readNonSpanningRecord(T target) throws 
IOException {
+               NextRecordResponse response = 
nonSpanningWrapper.getNextRecord(target);
+               if (response.result == PARTIAL_RECORD) {
+                       spanningWrapper.transferFrom(nonSpanningWrapper, 
response.bytesLeft);
+               }
+               return response.result;
+       }
+
        @Override
        public void clear() {
                this.nonSpanningWrapper.clear();
@@ -158,7 +125,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        @Override
        public boolean hasUnfinishedData() {
-               return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
+               return this.nonSpanningWrapper.hasRemaining() || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
+       @NotThreadSafe
+       static class NextRecordResponse {
+               DeserializationResult result;
+               int bytesLeft;
+
+               NextRecordResponse(DeserializationResult result, int bytesLeft) 
{
+                       this.result = result;
+                       this.bytesLeft = bytesLeft;
+               }
+
+               public NextRecordResponse updated(DeserializationResult result, 
int bytesLeft) {
+                       this.result = result;
+                       this.bytesLeft = bytesLeft;
+                       return this;
+               }
+       }
 }

Reply via email to