[
https://issues.apache.org/jira/browse/FLINK-21685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298757#comment-17298757
]
Yang Wang commented on FLINK-21685:
-----------------------------------
Could you please run the example job with HA enabled by using the following
yamls? This is exactly what I tried to reproduce your issue.
jobmanager.job
{code:java}
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
initContainers:
- name: artifacts-fetcher
image: apache/flink:1.12.2
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'cp',
'/opt/flink/examples/streaming/StateMachineExample.jar',
'/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: jobmanager
image: apache/flink:1.12.2
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.2.jar
- name: _POD_IP_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["standalone-job", "--job-classname",
"org.apache.flink.streaming.examples.statemachine.StateMachineExample",
"--host", "$(_POD_IP_ADDRESS)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink
image, change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
{code}
taskmanager.yaml
{code:java}
// code placeholder
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
initContainers:
- name: artifacts-fetcher
image: apache/flink:1.12.2
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'cp',
'/opt/flink/examples/streaming/StateMachineExample.jar',
'/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
- name: taskmanager
image: apache/flink:1.12.2
imagePullPolicy: Always
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.12.2.jar
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- mountPath: /opt/flink/usrlib
name: flink-artifact
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image,
change if necessary
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
{code}
flink-configuration-configmap.yaml
{code:java}
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
kubernetes.cluster-id: standalone-k8s-ha-app1
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
fs.oss.endpoint: oss-cn-beijing.aliyuncs.com
fs.oss.accessKeyId: xxxx
fs.oss.accessKeySecret: yyyy
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment
this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO # The following lines keep the log level of
common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO # Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n # Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong)
warnings from the Netty channel handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF logger.rest.name =
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
logger.rest.level = ERROR
logger.minirest.name =
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
logger.minirest.level = ERROR
{code}
jobmanager-rest-service.yaml
{code:java}
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: LoadBalancer
ports:
- name: rest
port: 8081
selector:
app: flink
component: jobmanager
{code}
> Flink JobManager failed to restart from checkpoint in kubernetes HA setup
> -------------------------------------------------------------------------
>
> Key: FLINK-21685
> URL: https://issues.apache.org/jira/browse/FLINK-21685
> Project: Flink
> Issue Type: Bug
> Components: Deployment / Kubernetes
> Affects Versions: 1.12.1, 1.12.2
> Reporter: Peng Zhang
> Priority: Major
> Attachments: 03-config.yaml, 06-jobmanager-deployment.yaml,
> 08-taskmanager-deployment.yaml, flink-ha.log
>
>
> We use Flink K8S session cluster with HA mode (1 JobManager and 4
> TaskManagers). When jobs are running in Flink, and JobManager restarted,
> Flink JobManager failed to recover job from checkpoint
> {code}
> 2021-03-08 13:16:42,962 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to fetch 1 checkpoints from storage.
> 2021-03-08 13:16:42,962 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to fetch 1 checkpoints from storage.
> 2021-03-08 13:16:42,962 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to retrieve checkpoint 1.
> 2021-03-08 13:16:43,014 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring
> job 9a534b2e309b24f78866b65d94082ead from Checkpoint 1 @ 1615208258041 for
> 9a534b2e309b24f78866b65d94082ead located at
> s3a://zalando-stellar-flink-state-eu-central-1-staging/checkpoints/9a534b2e309b24f78866b65d94082ead/chk-1.
>
> 2021-03-08 13:16:43,023 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master
> state to restore
> 2021-03-08 13:16:43,024 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@58d927d2
> for BrandCollectionTrackingJob (9a534b2e309b24f78866b65d94082ead).
> 2021-03-08 13:16:43,046 INFO
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl [] - JobManager
> runner for job BrandCollectionTrackingJob (9a534b2e309b24f78866b65d94082ead)
> was granted leadership with session id c258d8ce-69d3-49df-8bee-1b748d5bbe74
> at akka.tcp://[email protected]:6123/user/rpc/jobmanager_2.
> 2021-03-08 13:16:43,060 WARN akka.remote.transport.netty.NettyTransport
> [] - Remote connection to [null] failed with
> java.net.NoRouteToHostException: No route to host
> 2021-03-08 13:16:43,060 WARN akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system
> [akka.tcp://[email protected]:6123] has failed, address is now gated for
> [50] ms. Reason: [Association failed with
> [akka.tcp://[email protected]:6123]] Caused by:
> [java.net.NoRouteToHostException: No route to host]
> {code}
> Attached is the log, and our configuration.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)