This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9ebdaf40973e1b7ff6aaadd19d1d6cda1d3e69d8 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue May 12 11:24:01 2020 +0200 [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer --- .../main/java/org/apache/flink/util/IOUtils.java | 9 + .../api/serialization/NonSpanningWrapper.java | 81 +++++- .../network/api/serialization/SpanningWrapper.java | 278 ++++++++++----------- ...SpillingAdaptiveSpanningRecordDeserializer.java | 121 ++++----- 4 files changed, 271 insertions(+), 218 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java index 02b11e6..1f9af18 100644 --- a/flink-core/src/main/java/org/apache/flink/util/IOUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/IOUtils.java @@ -26,6 +26,8 @@ import java.io.OutputStream; import java.io.PrintStream; import java.net.Socket; +import static java.util.Arrays.asList; + /** * An utility class for I/O related functionality. */ @@ -244,6 +246,13 @@ public final class IOUtils { /** * Closes all elements in the iterable with closeQuietly(). */ + public static void closeAllQuietly(AutoCloseable... closeables) { + closeAllQuietly(asList(closeables)); + } + + /** + * Closes all elements in the iterable with closeQuietly(). + */ public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) { if (null != closeables) { for (AutoCloseable closeable : closeables) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java index 6d9602f..5de5467 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java @@ -17,27 +17,43 @@ package org.apache.flink.runtime.io.network.api.serialization; +import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse; import java.io.EOFException; import java.io.IOException; import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; import java.util.Optional; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD; +import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; + final class NonSpanningWrapper implements DataInputView { - MemorySegment segment; + private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = + "Serializer consumed more bytes than the record had. " + + "This indicates broken serialization. If you are using custom serialization types " + + "(Value or Writable), check their serialization methods. If you are using a " + + "Kryo-serialized type, check the corresponding Kryo serializer."; + + private MemorySegment segment; private int limit; - int position; + private int position; private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding - int remaining() { + private final NextRecordResponse reusedNextRecordResponse = new NextRecordResponse(null, 0); // performance impact of immutable objects not benchmarked + + private int remaining() { return this.limit - this.position; } @@ -47,14 +63,14 @@ final class NonSpanningWrapper implements DataInputView { this.position = 0; } - void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) { + void initializeFromMemorySegment(MemorySegment seg, int position, int limit) { this.segment = seg; this.position = position; - this.limit = leftOverLimit; + this.limit = limit; } Optional<MemorySegment> getUnconsumedSegment() { - if (remaining() == 0) { + if (!hasRemaining()) { return Optional.empty(); } MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(remaining()); @@ -62,6 +78,10 @@ final class NonSpanningWrapper implements DataInputView { return Optional.of(target); } + boolean hasRemaining() { + return remaining() > 0; + } + // ------------------------------------------------------------------------------------------------------------- // DataInput specific methods // ------------------------------------------------------------------------------------------------------------- @@ -290,4 +310,53 @@ final class NonSpanningWrapper implements DataInputView { public int read(byte[] b) { return read(b, 0, b.length); } + + ByteBuffer wrapIntoByteBuffer() { + return segment.wrap(position, remaining()); + } + + int copyContentTo(byte[] dst) { + final int numBytesChunk = remaining(); + segment.get(position, dst, 0, numBytesChunk); + return numBytesChunk; + } + + /** + * Copies the data and transfers the "ownership" (i.e. clears current wrapper). + */ + void transferTo(ByteBuffer dst) { + segment.get(position, dst, remaining()); + clear(); + } + + NextRecordResponse getNextRecord(IOReadableWritable target) throws IOException { + int recordLen = readInt(); + if (canReadRecord(recordLen)) { + return readInto(target); + } else { + return reusedNextRecordResponse.updated(PARTIAL_RECORD, recordLen); + } + } + + private NextRecordResponse readInto(IOReadableWritable target) throws IOException { + try { + target.read(this); + } catch (IndexOutOfBoundsException e) { + throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e); + } + int remaining = remaining(); + if (remaining < 0) { + throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining)); + } + return reusedNextRecordResponse.updated(remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining); + } + + boolean hasCompleteLength() { + return remaining() >= LENGTH_BYTES; + } + + private boolean canReadRecord(int recordLength) { + return recordLength <= remaining(); + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java index e59363f..430f0db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java @@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.StringUtils; import java.io.BufferedInputStream; @@ -38,9 +37,16 @@ import java.util.Arrays; import java.util.Optional; import java.util.Random; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; +import static org.apache.flink.util.FileUtils.writeCompletely; +import static org.apache.flink.util.IOUtils.closeAllQuietly; + final class SpanningWrapper { private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes + private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024; private final byte[] initialBuffer = new byte[1024]; @@ -50,7 +56,7 @@ final class SpanningWrapper { private final DataInputDeserializer serializationReadBuffer; - private final ByteBuffer lengthBuffer; + final ByteBuffer lengthBuffer; private FileChannel spillingChannel; @@ -70,10 +76,10 @@ final class SpanningWrapper { private DataInputViewStreamWrapper spillFileReader; - public SpanningWrapper(String[] tempDirs) { + SpanningWrapper(String[] tempDirs) { this.tempDirs = tempDirs; - this.lengthBuffer = ByteBuffer.allocate(4); + this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES); this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); this.recordLength = -1; @@ -82,187 +88,161 @@ final class SpanningWrapper { this.buffer = initialBuffer; } - void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException { - // set the length and copy what is available to the buffer - this.recordLength = nextRecordLength; - - final int numBytesChunk = partial.remaining(); - - if (nextRecordLength > THRESHOLD_FOR_SPILLING) { - // create a spilling channel and put the data there - this.spillingChannel = createSpillingChannel(); - - ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); - FileUtils.writeCompletely(this.spillingChannel, toWrite); - } - else { - // collect in memory - ensureBufferCapacity(nextRecordLength); - partial.segment.get(partial.position, buffer, 0, numBytesChunk); - } - - this.accumulatedRecordBytes = numBytesChunk; + /** + * Copies the data and transfers the "ownership" (i.e. clears the passed wrapper). + */ + void transferFrom(NonSpanningWrapper partial, int nextRecordLength) throws IOException { + updateLength(nextRecordLength); + accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(partial) : partial.copyContentTo(buffer); + partial.clear(); } - void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException { - // copy what we have to the length buffer - partial.segment.get(partial.position, this.lengthBuffer, partial.remaining()); + private boolean isAboveSpillingThreshold() { + return recordLength > THRESHOLD_FOR_SPILLING; } void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException { - int segmentPosition = offset; - int segmentRemaining = numBytes; - // check where to go. if we have a partial length, we need to complete it first - if (this.lengthBuffer.position() > 0) { - int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining); - segment.get(segmentPosition, this.lengthBuffer, toPut); - // did we complete the length? - if (this.lengthBuffer.hasRemaining()) { - return; - } else { - this.recordLength = this.lengthBuffer.getInt(0); - - this.lengthBuffer.clear(); - segmentPosition += toPut; - segmentRemaining -= toPut; - if (this.recordLength > THRESHOLD_FOR_SPILLING) { - this.spillingChannel = createSpillingChannel(); - } else { - ensureBufferCapacity(this.recordLength); - } - } + int limit = offset + numBytes; + int numBytesRead = isReadingLength() ? readLength(segment, offset, numBytes) : 0; + offset += numBytesRead; + numBytes -= numBytesRead; + if (numBytes == 0) { + return; } - // copy as much as we need or can for this next spanning record - int needed = this.recordLength - this.accumulatedRecordBytes; - int toCopy = Math.min(needed, segmentRemaining); + int toCopy = min(recordLength - accumulatedRecordBytes, numBytes); + if (toCopy > 0) { + copyFromSegment(segment, offset, toCopy); + } + if (numBytes > toCopy) { + leftOverData = segment; + leftOverStart = offset + toCopy; + leftOverLimit = limit; + } + } - if (spillingChannel != null) { - // spill to file - ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy); - FileUtils.writeCompletely(this.spillingChannel, toWrite); + private void copyFromSegment(MemorySegment segment, int offset, int length) throws IOException { + if (spillingChannel == null) { + copyIntoBuffer(segment, offset, length); } else { - segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy); + copyIntoFile(segment, offset, length); } + } - this.accumulatedRecordBytes += toCopy; - - if (toCopy < segmentRemaining) { - // there is more data in the segment - this.leftOverData = segment; - this.leftOverStart = segmentPosition + toCopy; - this.leftOverLimit = numBytes + offset; + private void copyIntoFile(MemorySegment segment, int offset, int length) throws IOException { + writeCompletely(spillingChannel, segment.wrap(offset, length)); + accumulatedRecordBytes += length; + if (hasFullRecord()) { + spillingChannel.close(); + spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE)); } + } - if (accumulatedRecordBytes == recordLength) { - // we have the full record - if (spillingChannel == null) { - this.serializationReadBuffer.setBuffer(buffer, 0, recordLength); - } - else { - spillingChannel.close(); + private void copyIntoBuffer(MemorySegment segment, int offset, int length) { + segment.get(offset, buffer, accumulatedRecordBytes, length); + accumulatedRecordBytes += length; + if (hasFullRecord()) { + serializationReadBuffer.setBuffer(buffer, 0, recordLength); + } + } - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); - this.spillFileReader = new DataInputViewStreamWrapper(inStream); - } + private int readLength(MemorySegment segment, int segmentPosition, int segmentRemaining) throws IOException { + int bytesToRead = min(lengthBuffer.remaining(), segmentRemaining); + segment.get(segmentPosition, lengthBuffer, bytesToRead); + if (!lengthBuffer.hasRemaining()) { + updateLength(lengthBuffer.getInt(0)); } + return bytesToRead; } - Optional<MemorySegment> getUnconsumedSegment() throws IOException { - // for the case of only partial length, no data - final int position = lengthBuffer.position(); - if (position > 0) { - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position); - lengthBuffer.position(0); - segment.put(0, lengthBuffer, position); - return Optional.of(segment); + private void updateLength(int length) throws IOException { + lengthBuffer.clear(); + recordLength = length; + if (isAboveSpillingThreshold()) { + spillingChannel = createSpillingChannel(); + } else { + ensureBufferCapacity(length); } + } - // for the case of full length, partial data in buffer - if (recordLength > THRESHOLD_FOR_SPILLING) { - throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled " + - "records."); - } else if (recordLength != -1) { - int leftOverSize = leftOverLimit - leftOverStart; - int unconsumedSize = Integer.BYTES + accumulatedRecordBytes + leftOverSize; - DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize); - serializer.writeInt(recordLength); - serializer.write(buffer, 0, accumulatedRecordBytes); - if (leftOverData != null) { - serializer.write(leftOverData, leftOverStart, leftOverSize); - } - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize); - segment.put(0, serializer.getSharedBuffer(), 0, segment.size()); - return Optional.of(segment); + Optional<MemorySegment> getUnconsumedSegment() throws IOException { + if (isReadingLength()) { + return Optional.of(copyLengthBuffer()); + } else if (isAboveSpillingThreshold()) { + throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records."); + } else if (recordLength == -1) { + return Optional.empty(); // no remaining partial length or data + } else { + return Optional.of(copyDataBuffer()); } + } - // for the case of no remaining partial length or data - return Optional.empty(); + private MemorySegment copyLengthBuffer() { + int position = lengthBuffer.position(); + MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position); + lengthBuffer.position(0); + segment.put(0, lengthBuffer, position); + return segment; } - void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { - deserializer.clear(); + private MemorySegment copyDataBuffer() throws IOException { + int leftOverSize = leftOverLimit - leftOverStart; + int unconsumedSize = LENGTH_BYTES + accumulatedRecordBytes + leftOverSize; + DataOutputSerializer serializer = new DataOutputSerializer(unconsumedSize); + serializer.writeInt(recordLength); + serializer.write(buffer, 0, accumulatedRecordBytes); + if (leftOverData != null) { + serializer.write(leftOverData, leftOverStart, leftOverSize); + } + MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize); + segment.put(0, serializer.getSharedBuffer(), 0, segment.size()); + return segment; + } + /** + * Copies the leftover data and transfers the "ownership" (i.e. clears this wrapper). + */ + void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) { + nonSpanningWrapper.clear(); if (leftOverData != null) { - deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); + nonSpanningWrapper.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } + clear(); } boolean hasFullRecord() { - return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength; + return recordLength >= 0 && accumulatedRecordBytes >= recordLength; } int getNumGatheredBytes() { - return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position()); + return accumulatedRecordBytes + (recordLength >= 0 ? LENGTH_BYTES : lengthBuffer.position()); } + @SuppressWarnings("ResultOfMethodCallIgnored") public void clear() { - this.buffer = initialBuffer; - this.serializationReadBuffer.releaseArrays(); - - this.recordLength = -1; - this.lengthBuffer.clear(); - this.leftOverData = null; - this.leftOverStart = 0; - this.leftOverLimit = 0; - this.accumulatedRecordBytes = 0; - - if (spillingChannel != null) { - try { - spillingChannel.close(); - } - catch (Throwable t) { - // ignore - } - spillingChannel = null; - } - if (spillFileReader != null) { - try { - spillFileReader.close(); - } - catch (Throwable t) { - // ignore - } - spillFileReader = null; - } - if (spillFile != null) { - spillFile.delete(); - spillFile = null; - } + buffer = initialBuffer; + serializationReadBuffer.releaseArrays(); + + recordLength = -1; + lengthBuffer.clear(); + leftOverData = null; + leftOverStart = 0; + leftOverLimit = 0; + accumulatedRecordBytes = 0; + + closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.delete()); + spillingChannel = null; + spillFileReader = null; + spillFile = null; } public DataInputView getInputView() { - if (spillFileReader == null) { - return serializationReadBuffer; - } - else { - return spillFileReader; - } + return spillFileReader == null ? serializationReadBuffer : spillFileReader; } private void ensureBufferCapacity(int minLength) { if (buffer.length < minLength) { - byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)]; + byte[] newBuffer = new byte[max(minLength, buffer.length * 2)]; System.arraycopy(buffer, 0, newBuffer, 0, accumulatedRecordBytes); buffer = newBuffer; } @@ -294,4 +274,16 @@ final class SpanningWrapper { random.nextBytes(bytes); return StringUtils.byteToHexString(bytes); } + + private int spill(NonSpanningWrapper partial) throws IOException { + ByteBuffer buffer = partial.wrapIntoByteBuffer(); + int length = buffer.remaining(); + writeCompletely(spillingChannel, buffer); + return length; + } + + private boolean isReadingLength() { + return lengthBuffer.position() > 0; + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index f20fbc9..75e6b0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -24,19 +24,22 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import javax.annotation.concurrent.NotThreadSafe; + import java.io.IOException; import java.util.Optional; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER; +import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD; +import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER; + /** * @param <T> The type of the record to be deserialized. */ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> { - private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = - "Serializer consumed more bytes than the record had. " + - "This indicates broken serialization. If you are using custom serialization types " + - "(Value or Writable), check their serialization methods. If you are using a " + - "Kryo-serialized type, check the corresponding Kryo serializer."; + static final int LENGTH_BYTES = Integer.BYTES; private final NonSpanningWrapper nonSpanningWrapper; @@ -58,11 +61,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit int numBytes = buffer.getSize(); // check if some spanning record deserialization is pending - if (this.spanningWrapper.getNumGatheredBytes() > 0) { - this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes); - } - else { - this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset); + if (spanningWrapper.getNumGatheredBytes() > 0) { + spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes); + } else { + nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + offset); } } @@ -75,14 +77,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit @Override public Optional<Buffer> getUnconsumedBuffer() throws IOException { - Optional<MemorySegment> target; - if (nonSpanningWrapper.remaining() > 0) { - target = nonSpanningWrapper.getUnconsumedSegment(); + final Optional<MemorySegment> unconsumedSegment; + if (nonSpanningWrapper.hasRemaining()) { + unconsumedSegment = nonSpanningWrapper.getUnconsumedSegment(); } else { - target = spanningWrapper.getUnconsumedSegment(); + unconsumedSegment = spanningWrapper.getUnconsumedSegment(); } - return target.map(memorySegment -> new NetworkBuffer( - memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, memorySegment.size())); + return unconsumedSegment.map(segment -> new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, segment.size())); } @Override @@ -91,65 +92,31 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit // this should be the majority of the cases for small records // for large records, this portion of the work is very small in comparison anyways - int nonSpanningRemaining = this.nonSpanningWrapper.remaining(); - - // check if we can get a full length; - if (nonSpanningRemaining >= 4) { - int len = this.nonSpanningWrapper.readInt(); - - if (len <= nonSpanningRemaining - 4) { - // we can get a full record from here - try { - target.read(this.nonSpanningWrapper); - - int remaining = this.nonSpanningWrapper.remaining(); - if (remaining > 0) { - return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; - } - else if (remaining == 0) { - return DeserializationResult.LAST_RECORD_FROM_BUFFER; - } - else { - throw new IndexOutOfBoundsException("Remaining = " + remaining); - } - } - catch (IndexOutOfBoundsException e) { - throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e); - } - } - else { - // we got the length, but we need the rest from the spanning deserializer - // and need to wait for more buffers - this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } - } else if (nonSpanningRemaining > 0) { - // we have an incomplete length - // add our part of the length to the length buffer - this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } + if (nonSpanningWrapper.hasCompleteLength()) { + return readNonSpanningRecord(target); - // spanning record case - if (this.spanningWrapper.hasFullRecord()) { - // get the full record - target.read(this.spanningWrapper.getInputView()); + } else if (nonSpanningWrapper.hasRemaining()) { + nonSpanningWrapper.transferTo(spanningWrapper.lengthBuffer); + return PARTIAL_RECORD; - // move the remainder to the non-spanning wrapper - // this does not copy it, only sets the memory segment - this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper); - this.spanningWrapper.clear(); + } else if (spanningWrapper.hasFullRecord()) { + target.read(spanningWrapper.getInputView()); + spanningWrapper.transferLeftOverTo(nonSpanningWrapper); + return nonSpanningWrapper.hasRemaining() ? INTERMEDIATE_RECORD_FROM_BUFFER : LAST_RECORD_FROM_BUFFER; - return (this.nonSpanningWrapper.remaining() == 0) ? - DeserializationResult.LAST_RECORD_FROM_BUFFER : - DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; } else { - return DeserializationResult.PARTIAL_RECORD; + return PARTIAL_RECORD; } } + private DeserializationResult readNonSpanningRecord(T target) throws IOException { + NextRecordResponse response = nonSpanningWrapper.getNextRecord(target); + if (response.result == PARTIAL_RECORD) { + spanningWrapper.transferFrom(nonSpanningWrapper, response.bytesLeft); + } + return response.result; + } + @Override public void clear() { this.nonSpanningWrapper.clear(); @@ -158,7 +125,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit @Override public boolean hasUnfinishedData() { - return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0; + return this.nonSpanningWrapper.hasRemaining() || this.spanningWrapper.getNumGatheredBytes() > 0; } + @NotThreadSafe + static class NextRecordResponse { + DeserializationResult result; + int bytesLeft; + + NextRecordResponse(DeserializationResult result, int bytesLeft) { + this.result = result; + this.bytesLeft = bytesLeft; + } + + public NextRecordResponse updated(DeserializationResult result, int bytesLeft) { + this.result = result; + this.bytesLeft = bytesLeft; + return this; + } + } }
