All checkpointing-related features do not work in batch mode.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/#important-considerations

On 04/08/2021 21:23, tobias.schu...@xing.com wrote:
He folks,

This is a crosspost of a stack overflow question (https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job <https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job>) which didn’t get any replies yet so please bare with me.

Let me start in a generic fashion to see if I somehow missed some concepts: I have a streaming flink job from which I created a savepoint and try to reuse that save point in the same job running in batch-mode. Simplified version of this job looks like this
Pseduo-Code:
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val stream = if (batchMode) {
   flink.readFile(path)
}
else {
   flink.addKafkaSource(topicName)
}

stream.keyBy(key)
stream.process(new ProcessorWithKeyedState())

CassandraSink.addSink(stream)
This works fine as long as I run the job without a savepoint. If I start the job from a savepoint I get an exception which looks like this
Caused by: java.lang.UnsupportedOperationException: Checkpoints are not 
supported in a single key state backend
     at 
org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43)
     at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623)
     at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
     at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
     at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
I could work around this if I set the option:
execution.batch-state-backend.enabled: false
but this eventually results in another error:
Caused by: java.lang.IllegalArgumentException: The fraction of memory to 
allocate should not be 0. Please make sure that all types of managed memory 
consumers contained in the job are configured with a non-negative weight via 
`taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at 
org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673)
at 
org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at 
org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526)
Of course I tried to set the config key |taskmanager.memory.managed.consumer-weights| (used |DATAPROC:70,PYTHON:30|) but this doesn't seems to have any effects. So I wonder if I have a conceptual error and can't reuse savepoints from a streaming job in a batch job or if I simply have a problem in my configuration. Any hints?

Thanks in advance,
Tobi


Reply via email to