rkhachatryan commented on a change in pull request #12120:
URL: https://github.com/apache/flink/pull/12120#discussion_r425069312



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
##########
@@ -91,62 +86,22 @@ public DeserializationResult getNextRecord(T target) throws 
IOException {
                // this should be the majority of the cases for small records
                // for large records, this portion of the work is very small in 
comparison anyways
 
-               int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-
-               // check if we can get a full length;
-               if (nonSpanningRemaining >= 4) {
-                       int len = this.nonSpanningWrapper.readInt();
-
-                       if (len <= nonSpanningRemaining - 4) {
-                               // we can get a full record from here
-                               try {
-                                       target.read(this.nonSpanningWrapper);
-
-                                       int remaining = 
this.nonSpanningWrapper.remaining();
-                                       if (remaining > 0) {
-                                               return 
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-                                       }
-                                       else if (remaining == 0) {
-                                               return 
DeserializationResult.LAST_RECORD_FROM_BUFFER;
-                                       }
-                                       else {
-                                               throw new 
IndexOutOfBoundsException("Remaining = " + remaining);
-                                       }
-                               }
-                               catch (IndexOutOfBoundsException e) {
-                                       throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
-                               }
-                       }
-                       else {
-                               // we got the length, but we need the rest from 
the spanning deserializer
-                               // and need to wait for more buffers
-                               
this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
-                               this.nonSpanningWrapper.clear();
-                               return DeserializationResult.PARTIAL_RECORD;
-                       }
-               } else if (nonSpanningRemaining > 0) {
-                       // we have an incomplete length
-                       // add our part of the length to the length buffer
-                       
this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
-                       this.nonSpanningWrapper.clear();
-                       return DeserializationResult.PARTIAL_RECORD;
-               }
+               if (nonSpanningWrapper.canReadLength()) {

Review comment:
       Renamed as you suggested below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to