2021-01-0809:47:31,636INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint9of 
job 5e953fb772f9030c728e7c0498555ae2 expired before completing.
2021-01-0809:47:31,637INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
    at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_265]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_265]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_265]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




-------- 转发邮件信息 --------
发件人:"旧城以西" <dty...@163.com>
发送日期:2021-01-08 10:12:45
收件人:"user-zh@flink.apache.org" <user-zh@flink.apache.org>
主题:flink on k8s 提交job时如何指定taskmanager的个数

各位大佬好:
     目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
    matchLabels:
      app: flink-taskmanager
  template:
    metadata:
      labels:
        app: flink-taskmanager
    spec:
      containers:
      - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
        name: flink-taskmanager
        ports:
        - containerPort: 8081
          name: flink-task
        workingDir: /opt/flink
        args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: flink-jobmanager-svc
        - name: TZ
          value: "Asia/Shanghai"
        resources:
          requests:
            cpu: 1200m
            memory: 1024Mi
          limits:
            cpu: 2000m
            memory: 2048Mi
        volumeMounts:
        - name: flink-taskmanager-pv
          mountPath: /opt/flink/conf
        - name: flink-jobmanager-lib-pv
          mountPath: /opt/flink/lib
      volumes:
      - name: flink-taskmanager-pv
        persistentVolumeClaim:
          claimName: flink-taskmanager-pvc
      - name: flink-jobmanager-lib-pv
        persistentVolumeClaim:
          claimName: flink-jobmanager-lib-pvc
      imagePullSecrets:
        - name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
org.apache.flink.api.common.time.Time.minutes(1)));





 

回复