[ 
https://issues.apache.org/jira/browse/FLINK-21685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298757#comment-17298757
 ] 

Yang Wang commented on FLINK-21685:
-----------------------------------

Could you please run the example job with HA enabled by using the following 
yamls? This is exactly what I tried to reproduce your issue.

 

jobmanager.job
{code:java}
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      initContainers:
        - name: artifacts-fetcher
          image: apache/flink:1.12.2
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'cp', 
'/opt/flink/examples/streaming/StateMachineExample.jar', 
'/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
        - name: jobmanager
          image: apache/flink:1.12.2
          imagePullPolicy: Always
          env:
          - name: ENABLE_BUILT_IN_PLUGINS
            value: flink-oss-fs-hadoop-1.12.2.jar
          - name: _POD_IP_ADDRESS
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          args: ["standalone-job", "--job-classname", 
"org.apache.flink.streaming.examples.statemachine.StateMachineExample", 
"--host", "$(_POD_IP_ADDRESS)"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - mountPath: /opt/flink/usrlib
              name: flink-artifact
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink 
image, change if necessary
      volumes:
        - name: flink-artifact
          emptyDir: { }
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
{code}
taskmanager.yaml
{code:java}
// code placeholder
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      initContainers:
        - name: artifacts-fetcher
          image: apache/flink:1.12.2
          imagePullPolicy: IfNotPresent
          # Use wget or other tools to get user jars from remote storage
          command: [ 'cp', 
'/opt/flink/examples/streaming/StateMachineExample.jar', 
'/flink-artifact/myjob.jar' ]
          volumeMounts:
            - mountPath: /flink-artifact
              name: flink-artifact
      containers:
      - name: taskmanager
        image: apache/flink:1.12.2
        imagePullPolicy: Always
        env:
        - name: ENABLE_BUILT_IN_PLUGINS
          value: flink-oss-fs-hadoop-1.12.2.jar
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - mountPath: /opt/flink/usrlib
          name: flink-artifact
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, 
change if necessary
      volumes:
      - name: flink-artifact
        emptyDir: { }
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
{code}
flink-configuration-configmap.yaml
{code:java}
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
    kubernetes.cluster-id: standalone-k8s-ha-app1
    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: oss://flink-debug-yiqi/flink-ha
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
    fs.oss.endpoint: oss-cn-beijing.aliyuncs.com
    fs.oss.accessKeyId: xxxx
    fs.oss.accessKeySecret: yyyy
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # Uncomment 
this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO    # The following lines keep the log level of 
common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to 
manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10    # Suppress the irrelevant (wrong) 
warnings from the Netty channel handler
    logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    logger.rest.name = 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
    logger.rest.level = ERROR
    logger.minirest.name = 
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
    logger.minirest.level = ERROR
{code}
 

jobmanager-rest-service.yaml
{code:java}
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: LoadBalancer
  ports:
  - name: rest
    port: 8081
  selector:
    app: flink
    component: jobmanager
{code}

> Flink JobManager failed to restart from checkpoint in kubernetes HA setup
> -------------------------------------------------------------------------
>
>                 Key: FLINK-21685
>                 URL: https://issues.apache.org/jira/browse/FLINK-21685
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.1, 1.12.2
>            Reporter: Peng Zhang
>            Priority: Major
>         Attachments: 03-config.yaml, 06-jobmanager-deployment.yaml, 
> 08-taskmanager-deployment.yaml, flink-ha.log
>
>
> We use Flink K8S session cluster with HA mode (1 JobManager and 4 
> TaskManagers). When jobs are running in Flink, and JobManager restarted, 
> Flink JobManager failed to recover job from checkpoint
> {code}
> 2021-03-08 13:16:42,962 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 1 checkpoints from storage. 
> 2021-03-08 13:16:42,962 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 1 checkpoints from storage. 
> 2021-03-08 13:16:42,962 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1. 
> 2021-03-08 13:16:43,014 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 9a534b2e309b24f78866b65d94082ead from Checkpoint 1 @ 1615208258041 for 
> 9a534b2e309b24f78866b65d94082ead located at 
> s3a://zalando-stellar-flink-state-eu-central-1-staging/checkpoints/9a534b2e309b24f78866b65d94082ead/chk-1.
>  
> 2021-03-08 13:16:43,023 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master 
> state to restore 
> 2021-03-08 13:16:43,024 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using failover strategy 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@58d927d2
>  for BrandCollectionTrackingJob (9a534b2e309b24f78866b65d94082ead). 
> 2021-03-08 13:16:43,046 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl      [] - JobManager 
> runner for job BrandCollectionTrackingJob (9a534b2e309b24f78866b65d94082ead) 
> was granted leadership with session id c258d8ce-69d3-49df-8bee-1b748d5bbe74 
> at akka.tcp://[email protected]:6123/user/rpc/jobmanager_2. 
> 2021-03-08 13:16:43,060 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [null] failed with 
> java.net.NoRouteToHostException: No route to host 
> 2021-03-08 13:16:43,060 WARN  akka.remote.ReliableDeliverySupervisor          
>              [] - Association with remote system 
> [akka.tcp://[email protected]:6123] has failed, address is now gated for 
> [50] ms. Reason: [Association failed with 
> [akka.tcp://[email protected]:6123]] Caused by: 
> [java.net.NoRouteToHostException: No route to host]
> {code}
> Attached is the log, and our configuration.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to