rkhachatryan commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r424938864



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
##########
@@ -183,22 +196,26 @@ private void updateLength(int length) throws IOException {
        @Override
        public CloseableIterator<Buffer> getUnconsumedSegment() throws 
IOException {
                if (isReadingLength()) {
-                       return singleBufferIterator(copyLengthBuffer());
+                       return 
singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, 
lengthBuffer.position()));
                } else if (isAboveSpillingThreshold()) {
-                       throw new UnsupportedOperationException("Unaligned 
checkpoint currently do not support spilled records.");
+                       return createSpilledDataIterator();
                } else if (recordLength == -1) {
-                       return CloseableIterator.empty(); // no remaining 
partial length or data
+                       return empty(); // no remaining partial length or data
                } else {
                        return singleBufferIterator(copyDataBuffer());
                }
        }
 
-       private MemorySegment copyLengthBuffer() {
-               int position = lengthBuffer.position();
-               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(position);
-               lengthBuffer.position(0);
-               segment.put(0, lengthBuffer, position);
-               return segment;
+       @SuppressWarnings("unchecked")
+       private CloseableIterator<Buffer> createSpilledDataIterator() throws 
IOException {
+               if (spillingChannel != null) {
+                       spillingChannel.force(false);
+               }
+               return CloseableIterator.flatten(
+                       toSingleBufferIterator(wrapInt(recordLength)),
+                       new FileBasedBufferIterator(spillFile, 
min(accumulatedRecordBytes, recordLength), fileBufferSize),
+                       leftOverData == null ? empty() : 
toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, 
leftOverLimit))
+               );

Review comment:
       It flattens 3 iterators, one of which can be empty. I tried to put it 
all into `FileBasedBufferIterator` but it doesn't simplify anything (as there 
will be many checks there) and makes it more error-prone.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to