rkhachatryan commented on a change in pull request #12120: URL: https://github.com/apache/flink/pull/12120#discussion_r425068399
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ########## @@ -91,62 +86,22 @@ public DeserializationResult getNextRecord(T target) throws IOException { // this should be the majority of the cases for small records // for large records, this portion of the work is very small in comparison anyways - int nonSpanningRemaining = this.nonSpanningWrapper.remaining(); - - // check if we can get a full length; - if (nonSpanningRemaining >= 4) { - int len = this.nonSpanningWrapper.readInt(); - - if (len <= nonSpanningRemaining - 4) { - // we can get a full record from here - try { - target.read(this.nonSpanningWrapper); - - int remaining = this.nonSpanningWrapper.remaining(); - if (remaining > 0) { - return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; - } - else if (remaining == 0) { - return DeserializationResult.LAST_RECORD_FROM_BUFFER; - } - else { - throw new IndexOutOfBoundsException("Remaining = " + remaining); - } - } - catch (IndexOutOfBoundsException e) { - throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e); - } - } - else { - // we got the length, but we need the rest from the spanning deserializer - // and need to wait for more buffers - this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } - } else if (nonSpanningRemaining > 0) { - // we have an incomplete length - // add our part of the length to the length buffer - this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } + if (nonSpanningWrapper.canReadLength()) { + return nonSpanningWrapper.getNextRecord( + target, + recordLength -> spanningWrapper.transferFrom(nonSpanningWrapper, recordLength)); - // spanning record case - if (this.spanningWrapper.hasFullRecord()) { - // get the full record - target.read(this.spanningWrapper.getInputView()); + } else if (nonSpanningWrapper.hasRemaining()) { Review comment: It would change the existing logic and make less readable IMO. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org