Hi all,

I have been struggling with this issue for a couple of days now.
Checkpointing appears to fail as the Task Source ( kinesis stream in this
case) appears to be in a FINISHED state.

Excerpt from Jobmanager logs:

2021-11-25 12:52:00,479 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2)
(eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,494 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
(1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,569 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
-> Reduce state/ParMultiDo(ReduceState) -> Store
state/ParMultiDo(StoreState) (1/2) (1a77c7ed026ac4e4a59ab66876053102)
switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,582 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
-> Reduce state/ParMultiDo(ReduceState) -> Store
state/ParMultiDo(StoreState) (2/2) (31588d4dad22821d7226ec65687d0edb)
switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,881 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
(1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED.
2021-11-25 12:52:06,528 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Checkpoint triggering task Source: Source Events/Read(KinesisSource) ->
Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job
00000000000000000000000000000000 is not in state RUNNING but FINISHED
instead. Aborting checkpoint.

For context, here is an excerpt from the flink-conf.yaml file:

flink-conf.yaml: |+
# TaskManager configurations
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
# JobManager configurations
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m

blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 1 # default paralleism when not defined elsewhere
kubernetes.namespace: {{ $fullName }} # The namespace that will be used for
running the jobmanager and taskmanager pods.
scheduler-mode: reactive

# High-availability configurations
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://company-flink-{{ .Values.environment
}}/recovery
hive.s3.use-instance-credentials: true
kubernetes.cluster-id: {{ $fullName }}

# Checkpoint and State backend
state.backend: rocksdb
state.checkpoint-storage: filesystem # jobmanager or filesystem
state.backend.incremental: true # only supported by rocksdb
state.checkpoints.dir: s3://company-flink-{{ .Values.environment
}}/checkpoints
execution.checkpointing.interval: 20 min
execution.checkpointing.min-pause: 10 min # minimum time between
checkpoints to reduce overhead
state.checkpoints.num-retained: 1 # Maximum number of completed checkpoints
to retain

# Fault tolerance
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 10 s
restart-strategy.fixed-delay.attempts: 3 # try n times before job is
considered failed

>From what I can see the job is still running, and the checkpointing keeps
failing.
After finding this (https://issues.apache.org/jira/browse/FLINK-2491) I
updated the default parallelism from 2 -> 1 since our current kinesis steam
consists of 1 shard. But problem persists.

Any ideas?

Jonas

Reply via email to