[
https://issues.apache.org/jira/browse/FLINK-39162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-39162:
----------------------------
Description:
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I
encountered a high frequency of test failures for the test case :
Topology.CUSTOM_PARTITIONER [1]
{code:java}
Caused by: java.io.IOException: Serializer consumed more bytes than the record
had. This indicates broken serialization. If you are using custom serialization
types (Value or Writable), check their serialization methods. If you are using
a Kryo-serialized type, check the corresponding Kryo serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
... 10 more
Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
{code}
It could be reproduced reliably when upscale custom partitioner.
Based on current testing, it is highly likely to be a bug in the prod code, and
the failure rate is over 50%.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.
h2. More information:
As I understand, if some partitioners do not support UC, it could fallback to
AC, just like the forward partitioner[2], so checkpoint is slow for these
partitioners.
Currently, the data is corrupted for custom partitioner:
- slow checkpoint is expected for disabled partitioner
- Data corrupt is not expected
That is why I think it should be treat as a bug.
[1]
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]
[2]
https://github.com/apache/flink/blob/4fb13082da9e15eaa20392db0f1ad21e83349cfa/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java#L89
was:
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I
encountered a high frequency of test failures for the test case :
Topology.CUSTOM_PARTITIONER [1]
{code:java}
Caused by: java.io.IOException: Serializer consumed more bytes than the record
had. This indicates broken serialization. If you are using custom serialization
types (Value or Writable), check their serialization methods. If you are using
a Kryo-serialized type, check the corresponding Kryo serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
at
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
... 10 more
Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
{code}
It could be reproduced reliably when upscale custom partitioner.
Based on current testing, it is highly likely to be a bug in the prod code, and
the failure rate is over 50%.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.
[1]
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]
> The data of the custom partitioner is corrupted when unaligned checkpoint is
> enabled
> ------------------------------------------------------------------------------------
>
> Key: FLINK-39162
> URL: https://issues.apache.org/jira/browse/FLINK-39162
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Reporter: Rui Fan
> Priority: Major
>
> When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases,
> I encountered a high frequency of test failures for the test case :
> Topology.CUSTOM_PARTITIONER [1]
>
> {code:java}
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
> at
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
> at
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
> at
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
> ... 10 more
> Caused by: java.lang.IndexOutOfBoundsException
> at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
> at
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> {code}
>
> It could be reproduced reliably when upscale custom partitioner.
> Based on current testing, it is highly likely to be a bug in the prod code,
> and the failure rate is over 50%.
> {code:java}
> # existing CUSTOM_PARTITIONER test case
> new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
> # failure test case
> new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
> Do not know why the test case only covered downscaling and not upscaling.
>
> h2. More information:
> As I understand, if some partitioners do not support UC, it could fallback to
> AC, just like the forward partitioner[2], so checkpoint is slow for these
> partitioners.
> Currently, the data is corrupted for custom partitioner:
> - slow checkpoint is expected for disabled partitioner
> - Data corrupt is not expected
> That is why I think it should be treat as a bug.
>
> [1]
> [https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]
> [2]
> https://github.com/apache/flink/blob/4fb13082da9e15eaa20392db0f1ad21e83349cfa/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java#L89
--
This message was sent by Atlassian Jira
(v8.20.10#820010)