Naci Simsek created FLINK-38307:
-----------------------------------
Summary: Resuming from both savepoints and checkpoints is NOT
supported for Changelog State Backend
Key: FLINK-38307
URL: https://issues.apache.org/jira/browse/FLINK-38307
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.19.1, 1.18.1, 1.19.0, 1.18.0
Environment: The piece of code that triggers is:
{code:java}
private void checkForcedFullSnapshotSupport(CheckpointOptions
checkpointOptions) {
if
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)
&& !stateBackend.supportsNoClaimRestoreMode()) {
throw new IllegalStateException(
String.format(
"Configured state backend (%s) does not support
enforcing a full"
+ " snapshot. If you are restoring in %s
mode, please"
+ " consider choosing %s mode.",
stateBackend, RecoveryClaimMode.NO_CLAIM,
RecoveryClaimMode.CLAIM));
} else if (checkpointOptions.getCheckpointType().isSavepoint()) {
SavepointType savepointType = (SavepointType)
checkpointOptions.getCheckpointType();
if
(!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
throw new IllegalStateException(
String.format(
"Configured state backend (%s) does not support
%s savepoints",
stateBackend, savepointType.getFormatType()));
}
}
} {code}
The configuration for *{{CLAIM}}* is not even checked when throwing the
exception.
Reporter: Naci Simsek
Attachments: flink_changelog_restore_logs.zip
Start Flink deployment with below settings:
{code:java}
execution.savepoint-restore-mode, CLAIM
execution.checkpointing.interval, 20s
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1 {code}
Wait for couple checkpoints, and cancel the app.
Then, start new flink app by resuming from either an existing checkpoint or
savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
{code:java}
execution.savepoint-restore-mode, CLAIM
state.backend.changelog.enabled, true
execution.checkpointing.interval, 20s
dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
state.backend.changelog.storage, filesystem
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1{code}
*Expected result:*
As stated here in the Flink doc, app should successfully be restored and
operate successfully.
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
*Actual Result:*
App at first, seemed to be restored successfully, till the first checkpoint it
performs.
As soon as a checkpoint is triggered, flink throws below exception:
{code:java}
2025-08-30 13:22:56,176 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 16 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
356832174120c71afcaabdd5a46a60d9.2025-08-30 13:22:56,185 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
state snapshot on operator Source: Car data generator source for checkpoint
162025-08-30 13:22:56,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
data generator source -> Timestamps/Watermarks (1/1)
(202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
(dataPort=54889).java.lang.Exception: Error while triggering checkpoint 16 for
Source: Car data generator source -> Timestamps/Watermarks (1/1)#0 at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
~[flink-dist-1.18.1.jar:1.18.1] at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?] at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[?:?] at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[?:?]Caused by: java.lang.IllegalStateException: Configured state backend
(org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
please consider choosing either CLAIM or LEGACY restore mode. at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
~[flink-dist-1.18.1.jar:1.18.1] ... 31 more {code}
This exception makes it impossible to activate changelog state backend for the
pipelines where state restore is essential.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)