All checkpointing-related features do not work in batch mode.
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/#important-considerations
On 04/08/2021 21:23, tobias.schu...@xing.com wrote:
He folks,
This is a crosspost of a stack overflow question
(https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job
<https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job>)
which didn’t get any replies yet so please bare with me.
Let me start in a generic fashion to see if I somehow missed some
concepts: I have a streaming flink job from which I created a
savepoint and try to reuse that save point in the same job running in
batch-mode. Simplified version of this job looks like this
Pseduo-Code:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
flink.readFile(path)
}
else {
flink.addKafkaSource(topicName)
}
stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())
CassandraSink.addSink(stream)
This works fine as long as I run the job without a savepoint. If I
start the job from a savepoint I get an exception which looks like this
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not
supported in a single key state backend
at
org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
at
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
I could work around this if I set the option:
execution.batch-state-backend.enabled: false
but this eventually results in another error:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to
allocate should not be 0. Please make sure that all types of managed memory
consumers contained in the job are configured with a non-negative weight via
`taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at
org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at
org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
Of course I tried to set the config key
|taskmanager.memory.managed.consumer-weights| (used
|DATAPROC:70,PYTHON:30|) but this doesn't seems to have any effects.
So I wonder if I have a conceptual error and can't reuse savepoints
from a streaming job in a batch job or if I simply have a problem in
my configuration. Any hints?
Thanks in advance,
Tobi