Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
*"Fields can be removed. Once removed, the previous value for the removed
field will be dropped in future checkpoints and savepoints."*

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(
PojoSerializer.java:119)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:184)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:56)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field
serializer in the corresponding object PojoSerializer "fieldSerializers"
array
But it is not present in the "fields", where we have a gap of 1 index (for
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :

2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for operator
OperatorXXX
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.triggerCheckpointOnBarrier(StreamTask.java:1274)
    at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.triggerCheckpoint(
SingleCheckpointBarrierHandler.java:287)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
.java:64)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
SingleCheckpointBarrierHandler.java:493)
    at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
AbstractAlignedBarrierHandlerState.java:74)
    at org.apache.flink.streaming.runtime.io.checkpointing.
AbstractAlignedBarrierHandlerState.barrierReceived(
AbstractAlignedBarrierHandlerState.java:66)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.lambda$processBarrier$2(
SingleCheckpointBarrierHandler.java:234)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
SingleCheckpointBarrierHandler.java:262)
    at org.apache.flink.streaming.runtime.io.checkpointing.
SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler
.java:231)
    at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at org.apache.flink.streaming.runtime.io.checkpointing.
CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
    at org.apache.flink.runtime.taskmanager.Task
.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 2737490 for operator OperatorXXX
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:265)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:170)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.snapshotState(AbstractStreamOperator.java:348)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.checkpointStreamOperator(RegularOperatorChain.java:233)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.snapshotState(RegularOperatorChain.java:186)
    at org.apache.flink.streaming.runtime.tasks.
SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(
SubtaskCheckpointCoordinatorImpl.java:605)
    at org.apache.flink.streaming.runtime.tasks.
SubtaskCheckpointCoordinatorImpl.checkpointState(
SubtaskCheckpointCoordinatorImpl.java:315)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$performCheckpoint$14(StreamTask.java:1329)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.performCheckpoint(StreamTask.java:1315)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.triggerCheckpointOnBarrier(StreamTask.java:1258)
    ... 22 more
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(
PojoSerializer.java:119)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:184)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(
PojoSerializer.java:56)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo
.<init>(RegisteredOperatorStateBackendMetaInfo.java:61)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo
.deepCopy(RegisteredOperatorStateBackendMetaInfo.java:96)
    at org.apache.flink.runtime.state.PartitionableListState.<init>(
PartitionableListState.java:63)
    at org.apache.flink.runtime.state.PartitionableListState.deepCopy(
PartitionableListState.java:76)
    at org.apache.flink.runtime.state.
DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(
DefaultOperatorStateBackendSnapshotStrategy.java:77)
    at org.apache.flink.runtime.state.
DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(
DefaultOperatorStateBackendSnapshotStrategy.java:36)
    at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(
SnapshotStrategyRunner.java:77)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(
DefaultOperatorStateBackend.java:230)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.snapshotState(StreamOperatorStateHandler.java:227)
    ... 33 more

Reply via email to