Hi all, I have been struggling with this issue for a couple of days now. Checkpointing appears to fail as the Task Source ( kinesis stream in this case) appears to be in a FINISHED state.
Excerpt from Jobmanager logs: 2021-11-25 12:52:00,479 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Source Events/Read(KinesisSource) -> Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2) (eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING. 2021-11-25 12:52:00,494 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Source Events/Read(KinesisSource) -> Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) (1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING. 2021-11-25 12:52:00,569 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (1/2) (1a77c7ed026ac4e4a59ab66876053102) switched from DEPLOYING to RUNNING. 2021-11-25 12:52:00,582 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (2/2) (31588d4dad22821d7226ec65687d0edb) switched from DEPLOYING to RUNNING. 2021-11-25 12:52:00,881 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Source Events/Read(KinesisSource) -> Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) (1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED. 2021-11-25 12:52:06,528 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint triggering task Source: Source Events/Read(KinesisSource) -> Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job 00000000000000000000000000000000 is not in state RUNNING but FINISHED instead. Aborting checkpoint. For context, here is an excerpt from the flink-conf.yaml file: flink-conf.yaml: |+ # TaskManager configurations taskmanager.numberOfTaskSlots: 2 taskmanager.rpc.port: 6122 taskmanager.memory.process.size: 1728m # JobManager configurations jobmanager.rpc.address: {{ $fullName }}-jobmanager jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m blob.server.port: 6124 queryable-state.proxy.ports: 6125 parallelism.default: 1 # default paralleism when not defined elsewhere kubernetes.namespace: {{ $fullName }} # The namespace that will be used for running the jobmanager and taskmanager pods. scheduler-mode: reactive # High-availability configurations high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://company-flink-{{ .Values.environment }}/recovery hive.s3.use-instance-credentials: true kubernetes.cluster-id: {{ $fullName }} # Checkpoint and State backend state.backend: rocksdb state.checkpoint-storage: filesystem # jobmanager or filesystem state.backend.incremental: true # only supported by rocksdb state.checkpoints.dir: s3://company-flink-{{ .Values.environment }}/checkpoints execution.checkpointing.interval: 20 min execution.checkpointing.min-pause: 10 min # minimum time between checkpoints to reduce overhead state.checkpoints.num-retained: 1 # Maximum number of completed checkpoints to retain # Fault tolerance restart-strategy: fixed-delay restart-strategy.fixed-delay.delay: 10 s restart-strategy.fixed-delay.attempts: 3 # try n times before job is considered failed >From what I can see the job is still running, and the checkpointing keeps failing. After finding this (https://issues.apache.org/jira/browse/FLINK-2491) I updated the default parallelism from 2 -> 1 since our current kinesis steam consists of 1 shard. But problem persists. Any ideas? Jonas