[ 
https://issues.apache.org/jira/browse/FLINK-39162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-39162:
----------------------------
    Description: 
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I 
encountered a high frequency of test failures for the test case : 
Topology.CUSTOM_PARTITIONER [1]

 
{code:java}
Caused by: java.io.IOException: Serializer consumed more bytes than the record 
had. This indicates broken serialization. If you are using custom serialization 
types (Value or Writable), check their serialization methods. If you are using 
a Kryo-serialized type, check the corresponding Kryo serializer.
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
    ... 10 more
Caused by: java.lang.IndexOutOfBoundsException
    at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
{code}
 

It could be reproduced reliably when upscale custom partitioner.

Based on current testing, it is highly likely to be a bug in the prod code, and 
the failure rate is over 50%.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.

[1] 
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]

  was:
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I 
encountered a high frequency of test failures for the test case : 
Topology.CUSTOM_PARTITIONER [1]

 
{code:java}
Caused by: java.io.IOException: Serializer consumed more bytes than the record 
had. This indicates broken serialization. If you are using custom serialization 
types (Value or Writable), check their serialization methods. If you are using 
a Kryo-serialized type, check the corresponding Kryo serializer.
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
    ... 10 more
Caused by: java.lang.IndexOutOfBoundsException
    at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
{code}
 

 

It could be reproduced reliably when upscale custom partitioner.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.

[1] 
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]


> The data of the custom partitioner is corrupted when unaligned checkpoint is 
> enabled
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-39162
>                 URL: https://issues.apache.org/jira/browse/FLINK-39162
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Rui Fan
>            Priority: Major
>
> When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, 
> I encountered a high frequency of test failures for the test case : 
> Topology.CUSTOM_PARTITIONER [1]
>  
> {code:java}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
>     ... 10 more
> Caused by: java.lang.IndexOutOfBoundsException
>     at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> {code}
>  
> It could be reproduced reliably when upscale custom partitioner.
> Based on current testing, it is highly likely to be a bug in the prod code, 
> and the failure rate is over 50%.
> {code:java}
> # existing CUSTOM_PARTITIONER test case
> new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
> # failure test case
> new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
> Do not know why the test case only covered downscaling and not upscaling.
> [1] 
> [https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to