rkhachatryan commented on a change in pull request #12120: URL: https://github.com/apache/flink/pull/12120#discussion_r424941876
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java ########## @@ -82,187 +94,171 @@ public SpanningWrapper(String[] tempDirs) { this.buffer = initialBuffer; } - void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException { - // set the length and copy what is available to the buffer - this.recordLength = nextRecordLength; - - final int numBytesChunk = partial.remaining(); - - if (nextRecordLength > THRESHOLD_FOR_SPILLING) { - // create a spilling channel and put the data there - this.spillingChannel = createSpillingChannel(); - - ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); - FileUtils.writeCompletely(this.spillingChannel, toWrite); - } - else { - // collect in memory - ensureBufferCapacity(nextRecordLength); - partial.segment.get(partial.position, buffer, 0, numBytesChunk); - } - - this.accumulatedRecordBytes = numBytesChunk; + /** + * We got the length, but we need the rest from the spanning deserializer and need to wait for more buffers. + */ + void transferFrom(NonSpanningWrapper partial, int nextRecordLength) throws IOException { + updateLength(nextRecordLength); + accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(partial) : partial.copyTo(buffer); + partial.clear(); } - void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException { - // copy what we have to the length buffer - partial.segment.get(partial.position, this.lengthBuffer, partial.remaining()); + private boolean isAboveSpillingThreshold() { + return recordLength > THRESHOLD_FOR_SPILLING; } void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException { - int segmentPosition = offset; - int segmentRemaining = numBytes; - // check where to go. if we have a partial length, we need to complete it first - if (this.lengthBuffer.position() > 0) { - int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining); - segment.get(segmentPosition, this.lengthBuffer, toPut); - // did we complete the length? - if (this.lengthBuffer.hasRemaining()) { - return; - } else { - this.recordLength = this.lengthBuffer.getInt(0); - - this.lengthBuffer.clear(); - segmentPosition += toPut; - segmentRemaining -= toPut; - if (this.recordLength > THRESHOLD_FOR_SPILLING) { - this.spillingChannel = createSpillingChannel(); - } else { - ensureBufferCapacity(this.recordLength); - } - } + Tuple2<Integer, Integer> positionAndRemaining = readLengthIfNeeded(segment, offset, numBytes); + int startPos = positionAndRemaining.f0; + int remaining = positionAndRemaining.f1; + if (remaining == 0) { + return; } - // copy as much as we need or can for this next spanning record - int needed = this.recordLength - this.accumulatedRecordBytes; - int toCopy = Math.min(needed, segmentRemaining); + int toCopy = min(recordLength - accumulatedRecordBytes, remaining); + if (toCopy > 0) { + readData(segment, startPos, toCopy); + } + if (remaining > toCopy) { + leftOverData = segment; + leftOverStart = startPos + toCopy; + leftOverLimit = offset + numBytes; + } + } - if (spillingChannel != null) { - // spill to file - ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy); - FileUtils.writeCompletely(this.spillingChannel, toWrite); + private void readData(MemorySegment segment, int offset, int length) throws IOException { + if (spillingChannel == null) { + readIntoBuffer(segment, offset, length); } else { - segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy); + readIntoFile(segment, offset, length); } + } - this.accumulatedRecordBytes += toCopy; + private void readIntoFile(MemorySegment segment, int offset, int length) throws IOException { + writeCompletely(spillingChannel, segment.wrap(offset, length)); + accumulatedRecordBytes += length; + if (hasFullRecord()) { + spillingChannel.close(); + spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE)); + } + } - if (toCopy < segmentRemaining) { - // there is more data in the segment - this.leftOverData = segment; - this.leftOverStart = segmentPosition + toCopy; - this.leftOverLimit = numBytes + offset; + private void readIntoBuffer(MemorySegment segment, int offset, int length) { + segment.get(offset, buffer, accumulatedRecordBytes, length); + accumulatedRecordBytes += length; + if (hasFullRecord()) { + serializationReadBuffer.setBuffer(buffer, 0, recordLength); } + } - if (accumulatedRecordBytes == recordLength) { - // we have the full record - if (spillingChannel == null) { - this.serializationReadBuffer.setBuffer(buffer, 0, recordLength); - } - else { - spillingChannel.close(); + private Tuple2<Integer, Integer> readLengthIfNeeded(MemorySegment segment, int offset, int numBytes) throws IOException { + if (isReadingLength()) { + return readLength(segment, offset, numBytes); + } else { + return Tuple2.of(offset, numBytes); + } + } - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); - this.spillFileReader = new DataInputViewStreamWrapper(inStream); - } + private Tuple2<Integer, Integer> readLength(MemorySegment segment, int segmentPosition, int segmentRemaining) throws IOException { + int numBytes = min(lengthBuffer.remaining(), segmentRemaining); + segment.get(segmentPosition, lengthBuffer, numBytes); + if (lengthBuffer.hasRemaining()) { + return POSITION_AND_SIZE_IF_LENGTH_NOT_FULLY_READ; Review comment: To avoid unnecessary creation of the same `Tuple2` each time (and make it more readable). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org