[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-02-11 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r255508677
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Spanning, deserialization forgets to read one byte - failure report 
comes from an additional
+* check in {@link SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   (serializedLength) -> serializedLength - 1);
+   }
+
+   /**
+* Spanning, serialization length is 17 (including headers), 
deserialization forgets to read one
+* byte - failure report comes from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   1);
+   }
+
+   /**
+* Spanning, spilling, deserialization forgets to read one byte - 
failure report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanningLargeRecord() 
throws Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingNotEnough.getRandom(),
+   32 * 1024);
+   }
+
+   private void testHandleWrongDeserialization(
+   WrongDeserializationValue testValue,
+   IntFunction segmentSizeProvider,
+   Matcher expectedCause) throws 
Exception {
+   expectedException.expectCause(expectedCause);
+   testHandleWrongDeserialization(testValue, 

[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-02-11 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252367679
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Spanning, deserialization forgets to read one byte - failure report 
comes from an additional
+* check in {@link SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   (serializedLength) -> serializedLength - 1);
+   }
+
+   /**
+* Spanning, serialization length is 17 (including headers), 
deserialization forgets to read one
+* byte - failure report comes from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   1);
+   }
+
+   /**
+* Spanning, spilling, deserialization forgets to read one byte - 
failure report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanningLargeRecord() 
throws Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingNotEnough.getRandom(),
+   32 * 1024);
+   }
+
+   private void testHandleWrongDeserialization(
+   WrongDeserializationValue testValue,
+   IntFunction segmentSizeProvider,
+   Matcher expectedCause) throws 
Exception {
+   expectedException.expectCause(expectedCause);
+   testHandleWrongDeserialization(testValue, 

[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252367679
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Spanning, deserialization forgets to read one byte - failure report 
comes from an additional
+* check in {@link SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   (serializedLength) -> serializedLength - 1);
+   }
+
+   /**
+* Spanning, serialization length is 17 (including headers), 
deserialization forgets to read one
+* byte - failure report comes from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   1);
+   }
+
+   /**
+* Spanning, spilling, deserialization forgets to read one byte - 
failure report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanningLargeRecord() 
throws Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingNotEnough.getRandom(),
+   32 * 1024);
+   }
+
+   private void testHandleWrongDeserialization(
+   WrongDeserializationValue testValue,
+   IntFunction segmentSizeProvider,
+   Matcher expectedCause) throws 
Exception {
+   expectedException.expectCause(expectedCause);
+   testHandleWrongDeserialization(testValue, 

[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252362202
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
 
 Review comment:
   let's discuss in the thread above...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252362102
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
 
 Review comment:
   This is the current behaviour (non-spanning accesses arrays and spanning 
provides an input view that resembles file/stream accesses, each with the 
respective exceptions in case of failures)...
   Do you think we should should be more generic and tolerate any `IOException` 
here? But then the cause may not be right...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252359992
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
 
 Review comment:
   Since the user can write custom serializers, we need to test that our code 
handles them as expected. This is what the test does. Or maybe I'm not getting 
your argument...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252359412
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -575,6 +615,18 @@ private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deseriali
}
}
 
+   private int getRemainingBytes() {
+   if (this.spillFileReader == null) {
+   return this.serializationReadBuffer.available();
+   } else {
+   try {
+   return this.spillFileReader.available();
+   } catch (IOException ignored) {
 
 Review comment:
   Hmm - it may look dangerous but honestly, if retrieving the number of 
remaining bytes gets to some error, it will still be an error when trying to 
continue retrieving actual data (if we even do so). Since this is only used for 
better error reporting, I didn't want to add another source of errors.
   I could maybe rename the method to state that it tolerates failures so that 
it is not confusing for future users, e.g. `getRemainingBytesOrZero()`. How 
about that instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252346869
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/DeserializationUtils.java
 ##
 @@ -36,18 +40,25 @@
 *
 * @param records records to be deserialized
 * @param deserializer the record deserializer
+* @param mustBeFullRecords if set, fails if the deserialized records 
contain partial records
 * @return the number of full deserialized records
 */
public static int deserializeRecords(
ArrayDeque records,
-   RecordDeserializer deserializer) 
throws Exception {
+   RecordDeserializer deserializer,
+   boolean mustBeFullRecords) throws Exception {
int deserializedRecords = 0;
 
while (!records.isEmpty()) {
SerializationTestType expected = records.poll();
SerializationTestType actual = 
expected.getClass().newInstance();
 
-   if (deserializer.getNextRecord(actual).isFullRecord()) {
+   RecordDeserializer.DeserializationResult 
deserializationResult =
+   deserializer.getNextRecord(actual);
+   if (mustBeFullRecords) {
+   assertThat(deserializationResult, 
hasProperty("fullRecord", equalTo(true)));
 
 Review comment:
   (Slightly) better context:
   ```
   java.lang.AssertionError: 
   Expected: hasProperty("fullRecord", )
but: property 'fullRecord' was 
   
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
at 
org.apache.flink.runtime.io.network.util.DeserializationUtils.deserializeRecords(DeserializationUtils.java:61)
   ```
   vs.
   ```
   java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at 
org.apache.flink.runtime.io.network.util.DeserializationUtils.deserializeRecords(DeserializationUtils.java:61)
   ```
   
   This sometimes allows faster debugging because for some things you don't 
have to click into the source to know what actually failed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-01-30 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252342469
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -550,6 +550,7 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
}
else {
spillingChannel.close();
+   spillingChannel = null;
 
 Review comment:
   It's not quite the same logic though...in `clear()`, any exceptions from the 
`close()` call are ignored. Seems a bit messy putting this into a single method 
to be "re-usable"...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-20 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r234954344
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 ##
 @@ -62,127 +68,103 @@ public static Configuration getConfiguration() {
}
 
@Test
-   public void testIncorrectSerializer1() {
-   try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-   env.setParallelism(PARLLELISM);
-   env.getConfig().disableSysoutLogging();
+   public void testIncorrectSerializer1() throws Exception {
+   expectedException.expect(JobExecutionException.class);
+   expectedException.expectCause(
+   Matchers.both(isA(IOException.class))
+   .and(hasProperty("message", 
containsString("broken serialization";
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(PARLLELISM);
+   env.getConfig().disableSysoutLogging();
+
+   env
+   .generateSequence(1, 10 * PARLLELISM)
+   .map(new MapFunction() {
+   @Override
+   public ConsumesTooMuch map(Long value) throws 
Exception {
+   return new ConsumesTooMuch();
+   }
+   })
+   .rebalance()
+   .output(new DiscardingOutputFormat());
+
+   env.execute();
+   }
+
+   @Test
+   public void testIncorrectSerializer2() throws Exception {
+   expectedException.expect(JobExecutionException.class);
+   expectedException.expectCause(
+   Matchers.both(isA(IOException.class))
+   .and(hasProperty("message", 
containsString("broken serialization";
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(PARLLELISM);
+   env.getConfig().disableSysoutLogging();
 
-   env
+   env
.generateSequence(1, 10 * PARLLELISM)
-   .map(new MapFunction() {
+   .map(new MapFunction() {
@Override
-   public ConsumesTooMuch map(Long value) 
throws Exception {
-   return new ConsumesTooMuch();
+   public ConsumesTooMuchSpanning map(Long 
value) throws Exception {
+   return new 
ConsumesTooMuchSpanning();
}
})
.rebalance()
-   .output(new 
DiscardingOutputFormat());
+   .output(new 
DiscardingOutputFormat());
 
-   env.execute();
-   }
-   catch (JobExecutionException e) {
-   Throwable rootCause = e.getCause();
-   assertTrue(rootCause instanceof IOException);
-   assertTrue(rootCause.getMessage().contains("broken 
serialization"));
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
+   env.execute();
}
 
@Test
-   public void testIncorrectSerializer2() {
-   try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-   env.setParallelism(PARLLELISM);
-   env.getConfig().disableSysoutLogging();
-
-   env
-   .generateSequence(1, 10 * PARLLELISM)
-   .map(new MapFunction() {
-   @Override
-   public ConsumesTooMuchSpanning 
map(Long value) throws Exception {
-   return new 
ConsumesTooMuchSpanning();
-   }
-   })
-   .rebalance()
-   .output(new 
DiscardingOutputFormat());
-
-   env.execute();
-   }
-   catch (JobExecutionException e) {
-   Throwable rootCause = e.getCause();
-

[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-20 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r234642761
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 ##
 @@ -62,127 +68,103 @@ public static Configuration getConfiguration() {
}
 
@Test
-   public void testIncorrectSerializer1() {
-   try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-   env.setParallelism(PARLLELISM);
-   env.getConfig().disableSysoutLogging();
+   public void testIncorrectSerializer1() throws Exception {
 
 Review comment:
   ~yes (hopefully) - well, have a look at the new de-dup code, I tried 
different implementations and ended up at this one which was most practical~ 
oh, sorry, I actually forgot this unit test


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-19 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r234650118
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -649,4 +713,23 @@ private static String randomString(Random random) {
return StringUtils.byteToHexString(bytes);
}
}
+
+   private static void throwDeserializationError(
+   int len,
+   int remainingBytes,
+   int leftOverDataStart,
+   int leftOverDataLimit,
+   @Nullable Throwable cause) throws IOException {
 
 Review comment:
   I still think, for this concrete code, this seems over-engineered, but well, 
here you go.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-19 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r234642761
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 ##
 @@ -62,127 +68,103 @@ public static Configuration getConfiguration() {
}
 
@Test
-   public void testIncorrectSerializer1() {
-   try {
-   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-   env.setParallelism(PARLLELISM);
-   env.getConfig().disableSysoutLogging();
+   public void testIncorrectSerializer1() throws Exception {
 
 Review comment:
   yes (hopefully) - well, have a look at the new de-dup code, I tried 
different implementations and ended up at this one which was most practical


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` or an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230452399
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   actually, the type does not really matter - I could just as well use a 
`LargeObjectType` of an arbitrary length


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230440178
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
 
 Review comment:
   I'm not quite getting where you are going with this? Do you want it 
randomised?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230420404
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   32 * 1024,
+   null);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   new StringValueDeserializingTooMuch("Test string"),
+   17,
 
 Review comment:
   yes, that would be really nice but is not available out of the box since 
`StringValue#getBinaryLength` only returns `-1` and additionally, we do add one 
more length header from the `SpanningRecordSerializer` - I could evaluate the 
actual value through serializing it (maybe only once, statically) though and 
keep using that or so...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230404808
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -549,21 +578,67 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
}
else {
spillingChannel.close();
+   spillingChannel = null;
 
-   BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
+   BufferedInputStream inStream =
+   new BufferedInputStream(
+   new 
FileInputStream(checkNotNull(spillFile)),
+   2 * 1024 * 1024);
this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
}
}
}
 
-   private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
+   private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws 
IOException {
+   checkForDeserializationError(null);
+
deserializer.clear();
 
if (leftOverData != null) {

deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, 
leftOverLimit);
}
}
 
+   /**
+* In addition to a potentially thrown {@link EOFException}, 
checks for further
+* deserialization errors and tries to throw an exception with 
a more meaningful error
+* message.
+*
+* @param eofException
+*  exception thrown before the check (or 
null if there was none)
+*
+* @throws IOException
+*  in case of too few or too many bytes read, an 
exception with more useful data for
+*  debugging
+*/
+   private void checkForDeserializationError(@Nullable 
EOFException eofException) throws IOException {
 
 Review comment:
   Actually, `null` values play quite nicely with exception causes and do not 
require additional branches and overloads - this boils down to a lot less code 
here. I'd keep it here - let's see if you can live with it after the other 
refactorings ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-11-02 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r230398908
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -121,7 +134,14 @@ else if (remaining == 0) {
}
}
catch (IndexOutOfBoundsException e) {
-   throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
+   int bytesRead = 
this.nonSpanningWrapper.position - oldPosition;
 
 Review comment:
   Actually, now, we cannot ever get to the case that line 133 shows - I'll 
change that to a `checkState` instead.
   
   I can de-dup a bit but will have to add two position parameters because it 
is not always a `nonSpanningWrapper` for which we would know what to insert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-10-23 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r227307487
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -104,11 +116,216 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   expectedException.expect(IOException.class);
+   expectedException.expectMessage(" -1 remaining unread byte");
+   testHandleWrongDeserialization(new 
StringValueDeserializingTooMuch("Test string"), 32 * 1024);
+   }
+
+   /**
+* Non-spanning, serialization length is 16 (including headers), 
deserialization reads one byte
+* too many and succeeds.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   expectedException.expect(IOException.class);
+   expectedException.expectMessage(" -1 remaining unread byte");
+   testHandleWrongDeserialization(new 
StringValueDeserializingTooMuch("Test string"), 17);
 
 Review comment:
   Actually, I'd like to follow a one-scenario-per-test pattern for better 
isolation and testing specific things during debugging etc.
   Let me try to de-dup in another way: including the `ExpectedException` setup 
into `testHandleWrongDeserialization`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-10-22 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r226984032
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -137,7 +162,16 @@ else if (remaining == 0) {
// spanning record case
if (this.spanningWrapper.hasFullRecord()) {
// get the full record
-   target.read(this.spanningWrapper.getInputView());
+   try {
+   
target.read(this.spanningWrapper.getInputView());
+   } catch (EOFException e) {
+   Optional deserializationError = 
this.spanningWrapper.getDeserializationError(1);
 
 Review comment:
   For the first part, see above.
   
   Regarding the number of checks: basically, the 
`spanningWrapper.getDeserializationError(1)` check is for the case where we 
tried to consume more bytes than available, 
`spanningWrapper.getDeserializationError(0)` for the case where we did not 
consume all bytes we were supposed to consume. Only one of the two checks will 
be executed in any case.
   
   -> I hope, this will become clearer after the refactoring.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2018-10-22 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r226979613
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 ##
 @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
}
else {
spillingChannel.close();
+   spillingChannel = null;
 
-   BufferedInputStream inStream = new 
BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
+   BufferedInputStream inStream =
+   new BufferedInputStream(
+   new 
FileInputStream(checkNotNull(spillFile)),
+   2 * 1024 * 1024);
this.spillFileReader = new 
DataInputViewStreamWrapper(inStream);
}
}
}
 
-   private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
+   private void 
moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws 
IOException {
+   Optional deserializationError = 
getDeserializationError(0);
+   if (deserializationError.isPresent()) {
+   throw new 
IOException(deserializationError.get());
+   }
+
deserializer.clear();
 
if (leftOverData != null) {

deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, 
leftOverLimit);
}
}
 
+   private Optional getDeserializationError(int 
addToReadBytes) {
 
 Review comment:
   It took me a bit, but then I realised why: If this code throws an 
`EOFException`, `remainingSpanningBytes` will report `0` and not fail (and fall 
back to the thrown `EOFException` instead of our more detailed custom one):
   ```
try {

target.read(this.spanningWrapper.getInputView());
} catch (EOFException e) {
   ```
   -> by simulating that we read too many bytes (which is actually true), we 
get our exception containing the line `-1 remaining unread byte`. It is a bit 
hacky though. Let me try to refactor and add comments nonetheless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services