rkhachatryan commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r424950590



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
##########
@@ -47,27 +64,33 @@ void clear() {
                this.position = 0;
        }
 
-       void initializeFromMemorySegment(MemorySegment seg, int position, int 
leftOverLimit) {
+       void initializeFromMemorySegment(MemorySegment seg, int position, int 
limit) {
                this.segment = seg;
                this.position = position;
-               this.limit = leftOverLimit;
+               this.limit = limit;
        }
 
-       Optional<MemorySegment> getUnconsumedSegment() {
-               if (remaining() == 0) {
+       @Override
+       public Optional<MemorySegment> getUnconsumedSegment() {
+               if (hasRemaining()) {
+                       MemorySegment target = 
MemorySegmentFactory.allocateUnpooledSegment(remaining());
+                       segment.copyTo(position, target, 0, remaining());
+                       return Optional.of(target);
+               } else {
                        return Optional.empty();

Review comment:
       The other point of view is that `if` shouldn't negate the condition :)
   Will swap it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to