rkhachatryan commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r424941876



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
##########
@@ -82,187 +94,171 @@ public SpanningWrapper(String[] tempDirs) {
                this.buffer = initialBuffer;
        }
 
-       void initializeWithPartialRecord(NonSpanningWrapper partial, int 
nextRecordLength) throws IOException {
-               // set the length and copy what is available to the buffer
-               this.recordLength = nextRecordLength;
-
-               final int numBytesChunk = partial.remaining();
-
-               if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
-                       // create a spilling channel and put the data there
-                       this.spillingChannel = createSpillingChannel();
-
-                       ByteBuffer toWrite = 
partial.segment.wrap(partial.position, numBytesChunk);
-                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
-               }
-               else {
-                       // collect in memory
-                       ensureBufferCapacity(nextRecordLength);
-                       partial.segment.get(partial.position, buffer, 0, 
numBytesChunk);
-               }
-
-               this.accumulatedRecordBytes = numBytesChunk;
+       /**
+        * We got the length, but we need the rest from the spanning 
deserializer and need to wait for more buffers.
+        */
+       void transferFrom(NonSpanningWrapper partial, int nextRecordLength) 
throws IOException {
+               updateLength(nextRecordLength);
+               accumulatedRecordBytes = isAboveSpillingThreshold() ? 
spill(partial) : partial.copyTo(buffer);
+               partial.clear();
        }
 
-       void initializeWithPartialLength(NonSpanningWrapper partial) throws 
IOException {
-               // copy what we have to the length buffer
-               partial.segment.get(partial.position, this.lengthBuffer, 
partial.remaining());
+       private boolean isAboveSpillingThreshold() {
+               return recordLength > THRESHOLD_FOR_SPILLING;
        }
 
        void addNextChunkFromMemorySegment(MemorySegment segment, int offset, 
int numBytes) throws IOException {
-               int segmentPosition = offset;
-               int segmentRemaining = numBytes;
-               // check where to go. if we have a partial length, we need to 
complete it first
-               if (this.lengthBuffer.position() > 0) {
-                       int toPut = Math.min(this.lengthBuffer.remaining(), 
segmentRemaining);
-                       segment.get(segmentPosition, this.lengthBuffer, toPut);
-                       // did we complete the length?
-                       if (this.lengthBuffer.hasRemaining()) {
-                               return;
-                       } else {
-                               this.recordLength = this.lengthBuffer.getInt(0);
-
-                               this.lengthBuffer.clear();
-                               segmentPosition += toPut;
-                               segmentRemaining -= toPut;
-                               if (this.recordLength > THRESHOLD_FOR_SPILLING) 
{
-                                       this.spillingChannel = 
createSpillingChannel();
-                               } else {
-                                       ensureBufferCapacity(this.recordLength);
-                               }
-                       }
+               Tuple2<Integer, Integer> positionAndRemaining = 
readLengthIfNeeded(segment, offset, numBytes);
+               int startPos = positionAndRemaining.f0;
+               int remaining = positionAndRemaining.f1;
+               if (remaining == 0) {
+                       return;
                }
 
-               // copy as much as we need or can for this next spanning record
-               int needed = this.recordLength - this.accumulatedRecordBytes;
-               int toCopy = Math.min(needed, segmentRemaining);
+               int toCopy = min(recordLength - accumulatedRecordBytes, 
remaining);
+               if (toCopy > 0) {
+                       readData(segment, startPos, toCopy);
+               }
+               if (remaining > toCopy) {
+                       leftOverData = segment;
+                       leftOverStart = startPos + toCopy;
+                       leftOverLimit = offset + numBytes;
+               }
+       }
 
-               if (spillingChannel != null) {
-                       // spill to file
-                       ByteBuffer toWrite = segment.wrap(segmentPosition, 
toCopy);
-                       FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
+       private void readData(MemorySegment segment, int offset, int length) 
throws IOException {
+               if (spillingChannel == null) {
+                       readIntoBuffer(segment, offset, length);
                } else {
-                       segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
+                       readIntoFile(segment, offset, length);
                }
+       }
 
-               this.accumulatedRecordBytes += toCopy;
+       private void readIntoFile(MemorySegment segment, int offset, int 
length) throws IOException {
+               writeCompletely(spillingChannel, segment.wrap(offset, length));
+               accumulatedRecordBytes += length;
+               if (hasFullRecord()) {
+                       spillingChannel.close();
+                       spillFileReader = new DataInputViewStreamWrapper(new 
BufferedInputStream(new FileInputStream(spillFile), FILE_BUFFER_SIZE));
+               }
+       }
 
-               if (toCopy < segmentRemaining) {
-                       // there is more data in the segment
-                       this.leftOverData = segment;
-                       this.leftOverStart = segmentPosition + toCopy;
-                       this.leftOverLimit = numBytes + offset;
+       private void readIntoBuffer(MemorySegment segment, int offset, int 
length) {
+               segment.get(offset, buffer, accumulatedRecordBytes, length);
+               accumulatedRecordBytes += length;
+               if (hasFullRecord()) {
+                       serializationReadBuffer.setBuffer(buffer, 0, 
recordLength);
                }
+       }
 
-               if (accumulatedRecordBytes == recordLength) {
-                       // we have the full record
-                       if (spillingChannel == null) {
-                               this.serializationReadBuffer.setBuffer(buffer, 
0, recordLength);
-                       }
-                       else {
-                               spillingChannel.close();
+       private Tuple2<Integer, Integer> readLengthIfNeeded(MemorySegment 
segment, int offset, int numBytes) throws IOException {
+               if (isReadingLength()) {
+                       return readLength(segment, offset, numBytes);
+               } else {
+                       return Tuple2.of(offset, numBytes);
+               }
+       }
 
-                               BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
-                               this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
-                       }
+       private Tuple2<Integer, Integer> readLength(MemorySegment segment, int 
segmentPosition, int segmentRemaining) throws IOException {
+               int numBytes = min(lengthBuffer.remaining(), segmentRemaining);
+               segment.get(segmentPosition, lengthBuffer, numBytes);
+               if (lengthBuffer.hasRemaining()) {
+                       return POSITION_AND_SIZE_IF_LENGTH_NOT_FULLY_READ;

Review comment:
       To avoid unnecessary creation of the same `Tuple2` each time (and make 
it more readable).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to