[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r255508677 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 32 * 1024); + } + + /** +* Spanning, deserialization forgets to read one byte - failure report comes from an additional +* check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + (serializedLength) -> serializedLength - 1); + } + + /** +* Spanning, serialization length is 17 (including headers), deserialization forgets to read one +* byte - failure report comes from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 1); + } + + /** +* Spanning, spilling, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingNotEnough.getRandom(), + 32 * 1024); + } + + private void testHandleWrongDeserialization( + WrongDeserializationValue testValue, + IntFunction segmentSizeProvider, + Matcher expectedCause) throws Exception { + expectedException.expectCause(expectedCause); + testHandleWrongDeserialization(testValue,
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252367679 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 32 * 1024); + } + + /** +* Spanning, deserialization forgets to read one byte - failure report comes from an additional +* check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + (serializedLength) -> serializedLength - 1); + } + + /** +* Spanning, serialization length is 17 (including headers), deserialization forgets to read one +* byte - failure report comes from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 1); + } + + /** +* Spanning, spilling, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingNotEnough.getRandom(), + 32 * 1024); + } + + private void testHandleWrongDeserialization( + WrongDeserializationValue testValue, + IntFunction segmentSizeProvider, + Matcher expectedCause) throws Exception { + expectedException.expectCause(expectedCause); + testHandleWrongDeserialization(testValue,
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252367679 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 32 * 1024); + } + + /** +* Spanning, deserialization forgets to read one byte - failure report comes from an additional +* check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + (serializedLength) -> serializedLength - 1); + } + + /** +* Spanning, serialization length is 17 (including headers), deserialization forgets to read one +* byte - failure report comes from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 1); + } + + /** +* Spanning, spilling, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingNotEnough.getRandom(), + 32 * 1024); + } + + private void testHandleWrongDeserialization( + WrongDeserializationValue testValue, + IntFunction segmentSizeProvider, + Matcher expectedCause) throws Exception { + expectedException.expectCause(expectedCause); + testHandleWrongDeserialization(testValue,
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252362202 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( Review comment: let's discuss in the thread above... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252362102 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); Review comment: This is the current behaviour (non-spanning accesses arrays and spanning provides an input view that resembles file/stream accesses, each with the respective exceptions in case of failures)... Do you think we should should be more generic and tolerate any `IOException` here? But then the cause may not be right... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252359992 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { Review comment: Since the user can write custom serializers, we need to test that our code handles them as expected. This is what the test does. Or maybe I'm not getting your argument... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252359412 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -575,6 +615,18 @@ private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deseriali } } + private int getRemainingBytes() { + if (this.spillFileReader == null) { + return this.serializationReadBuffer.available(); + } else { + try { + return this.spillFileReader.available(); + } catch (IOException ignored) { Review comment: Hmm - it may look dangerous but honestly, if retrieving the number of remaining bytes gets to some error, it will still be an error when trying to continue retrieving actual data (if we even do so). Since this is only used for better error reporting, I didn't want to add another source of errors. I could maybe rename the method to state that it tolerates failures so that it is not confusing for future users, e.g. `getRemainingBytesOrZero()`. How about that instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252346869 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/DeserializationUtils.java ## @@ -36,18 +40,25 @@ * * @param records records to be deserialized * @param deserializer the record deserializer +* @param mustBeFullRecords if set, fails if the deserialized records contain partial records * @return the number of full deserialized records */ public static int deserializeRecords( ArrayDeque records, - RecordDeserializer deserializer) throws Exception { + RecordDeserializer deserializer, + boolean mustBeFullRecords) throws Exception { int deserializedRecords = 0; while (!records.isEmpty()) { SerializationTestType expected = records.poll(); SerializationTestType actual = expected.getClass().newInstance(); - if (deserializer.getNextRecord(actual).isFullRecord()) { + RecordDeserializer.DeserializationResult deserializationResult = + deserializer.getNextRecord(actual); + if (mustBeFullRecords) { + assertThat(deserializationResult, hasProperty("fullRecord", equalTo(true))); Review comment: (Slightly) better context: ``` java.lang.AssertionError: Expected: hasProperty("fullRecord", ) but: property 'fullRecord' was at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) at org.apache.flink.runtime.io.network.util.DeserializationUtils.deserializeRecords(DeserializationUtils.java:61) ``` vs. ``` java.lang.AssertionError at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at org.apache.flink.runtime.io.network.util.DeserializationUtils.deserializeRecords(DeserializationUtils.java:61) ``` This sometimes allows faster debugging because for some things you don't have to click into the source to know what actually failed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252342469 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -550,6 +550,7 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; Review comment: It's not quite the same logic though...in `clear()`, any exceptions from the `close()` call are ignored. Seems a bit messy putting this into a single method to be "re-usable"... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r234954344 ## File path: flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java ## @@ -62,127 +68,103 @@ public static Configuration getConfiguration() { } @Test - public void testIncorrectSerializer1() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARLLELISM); - env.getConfig().disableSysoutLogging(); + public void testIncorrectSerializer1() throws Exception { + expectedException.expect(JobExecutionException.class); + expectedException.expectCause( + Matchers.both(isA(IOException.class)) + .and(hasProperty("message", containsString("broken serialization"; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARLLELISM); + env.getConfig().disableSysoutLogging(); + + env + .generateSequence(1, 10 * PARLLELISM) + .map(new MapFunction() { + @Override + public ConsumesTooMuch map(Long value) throws Exception { + return new ConsumesTooMuch(); + } + }) + .rebalance() + .output(new DiscardingOutputFormat()); + + env.execute(); + } + + @Test + public void testIncorrectSerializer2() throws Exception { + expectedException.expect(JobExecutionException.class); + expectedException.expectCause( + Matchers.both(isA(IOException.class)) + .and(hasProperty("message", containsString("broken serialization"; + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARLLELISM); + env.getConfig().disableSysoutLogging(); - env + env .generateSequence(1, 10 * PARLLELISM) - .map(new MapFunction() { + .map(new MapFunction() { @Override - public ConsumesTooMuch map(Long value) throws Exception { - return new ConsumesTooMuch(); + public ConsumesTooMuchSpanning map(Long value) throws Exception { + return new ConsumesTooMuchSpanning(); } }) .rebalance() - .output(new DiscardingOutputFormat()); + .output(new DiscardingOutputFormat()); - env.execute(); - } - catch (JobExecutionException e) { - Throwable rootCause = e.getCause(); - assertTrue(rootCause instanceof IOException); - assertTrue(rootCause.getMessage().contains("broken serialization")); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + env.execute(); } @Test - public void testIncorrectSerializer2() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARLLELISM); - env.getConfig().disableSysoutLogging(); - - env - .generateSequence(1, 10 * PARLLELISM) - .map(new MapFunction() { - @Override - public ConsumesTooMuchSpanning map(Long value) throws Exception { - return new ConsumesTooMuchSpanning(); - } - }) - .rebalance() - .output(new DiscardingOutputFormat()); - - env.execute(); - } - catch (JobExecutionException e) { - Throwable rootCause = e.getCause(); -
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r234642761 ## File path: flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java ## @@ -62,127 +68,103 @@ public static Configuration getConfiguration() { } @Test - public void testIncorrectSerializer1() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARLLELISM); - env.getConfig().disableSysoutLogging(); + public void testIncorrectSerializer1() throws Exception { Review comment: ~yes (hopefully) - well, have a look at the new de-dup code, I tried different implementations and ended up at this one which was most practical~ oh, sorry, I actually forgot this unit test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r234650118 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -649,4 +713,23 @@ private static String randomString(Random random) { return StringUtils.byteToHexString(bytes); } } + + private static void throwDeserializationError( + int len, + int remainingBytes, + int leftOverDataStart, + int leftOverDataLimit, + @Nullable Throwable cause) throws IOException { Review comment: I still think, for this concrete code, this seems over-engineered, but well, here you go. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r234642761 ## File path: flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java ## @@ -62,127 +68,103 @@ public static Configuration getConfiguration() { } @Test - public void testIncorrectSerializer1() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARLLELISM); - env.getConfig().disableSysoutLogging(); + public void testIncorrectSerializer1() throws Exception { Review comment: yes (hopefully) - well, have a look at the new de-dup code, I tried different implementations and ended up at this one which was most practical This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` or an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230452399 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: actually, the type does not really matter - I could just as well use a `LargeObjectType` of an arbitrary length This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230440178 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), Review comment: I'm not quite getting where you are going with this? Do you want it randomised? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230420404 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +120,236 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 32 * 1024, + null); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + new StringValueDeserializingTooMuch("Test string"), + 17, Review comment: yes, that would be really nice but is not available out of the box since `StringValue#getBinaryLength` only returns `-1` and additionally, we do add one more length header from the `SpanningRecordSerializer` - I could evaluate the actual value through serializing it (maybe only once, statically) though and keep using that or so... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230404808 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +578,67 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + checkForDeserializationError(null); + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + /** +* In addition to a potentially thrown {@link EOFException}, checks for further +* deserialization errors and tries to throw an exception with a more meaningful error +* message. +* +* @param eofException +* exception thrown before the check (or null if there was none) +* +* @throws IOException +* in case of too few or too many bytes read, an exception with more useful data for +* debugging +*/ + private void checkForDeserializationError(@Nullable EOFException eofException) throws IOException { Review comment: Actually, `null` values play quite nicely with exception causes and do not require additional branches and overloads - this boils down to a lot less code here. I'd keep it here - let's see if you can live with it after the other refactorings ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r230398908 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -121,7 +134,14 @@ else if (remaining == 0) { } } catch (IndexOutOfBoundsException e) { - throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e); + int bytesRead = this.nonSpanningWrapper.position - oldPosition; Review comment: Actually, now, we cannot ever get to the case that line 133 shows - I'll change that to a `checkState` instead. I can de-dup a bit but will have to add two position parameters because it is not always a `nonSpanningWrapper` for which we would know what to insert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r227307487 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -104,11 +116,216 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + expectedException.expect(IOException.class); + expectedException.expectMessage(" -1 remaining unread byte"); + testHandleWrongDeserialization(new StringValueDeserializingTooMuch("Test string"), 32 * 1024); + } + + /** +* Non-spanning, serialization length is 16 (including headers), deserialization reads one byte +* too many and succeeds. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + expectedException.expect(IOException.class); + expectedException.expectMessage(" -1 remaining unread byte"); + testHandleWrongDeserialization(new StringValueDeserializingTooMuch("Test string"), 17); Review comment: Actually, I'd like to follow a one-scenario-per-test pattern for better isolation and testing specific things during debugging etc. Let me try to de-dup in another way: including the `ExpectedException` setup into `testHandleWrongDeserialization` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r226984032 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -137,7 +162,16 @@ else if (remaining == 0) { // spanning record case if (this.spanningWrapper.hasFullRecord()) { // get the full record - target.read(this.spanningWrapper.getInputView()); + try { + target.read(this.spanningWrapper.getInputView()); + } catch (EOFException e) { + Optional deserializationError = this.spanningWrapper.getDeserializationError(1); Review comment: For the first part, see above. Regarding the number of checks: basically, the `spanningWrapper.getDeserializationError(1)` check is for the case where we tried to consume more bytes than available, `spanningWrapper.getDeserializationError(0)` for the case where we did not consume all bytes we were supposed to consume. Only one of the two checks will be executed in any case. -> I hope, this will become clearer after the refactoring. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r226979613 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + Optional deserializationError = getDeserializationError(0); + if (deserializationError.isPresent()) { + throw new IOException(deserializationError.get()); + } + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + private Optional getDeserializationError(int addToReadBytes) { Review comment: It took me a bit, but then I realised why: If this code throws an `EOFException`, `remainingSpanningBytes` will report `0` and not fail (and fall back to the thrown `EOFException` instead of our more detailed custom one): ``` try { target.read(this.spanningWrapper.getInputView()); } catch (EOFException e) { ``` -> by simulating that we read too many bytes (which is actually true), we get our exception containing the line `-1 remaining unread byte`. It is a bit hacky though. Let me try to refactor and add comments nonetheless. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services