Fw:flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 Thread 西












2021-01-0809:47:31,636INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint9of 
job 5e953fb772f9030c728e7c0498555ae2 expired before completing.
2021-01-0809:47:31,637INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_265]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




 转发邮件信息 
发件人:"旧城以西" 
发送日期:2021-01-08 10:12:45
收件人:"user-zh@flink.apache.org" 
主题:flink on k8s 提交job时如何指定taskmanager的个数

各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(Checkpoi

flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 Thread 西
各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
org.apache.flink.api.common.time.Time.minutes(1)));


通过k8s 部署的flink 集群

2021-01-06 Thread 西
通过k8s-session部署的flink 集群,三个节点,job提交上去后为什么只占用一个task 
manager的节点。目前这样,导致数据倾斜,每次checkpoint 都会失败






 





 

通过k8s 部署的flink 集群

2021-01-06 Thread 西
通过k8s-session部署的flink 集群,三个节点,job提交上去后为什么只占用一个task 
manager的节点。目前这样,导致数据倾斜,每次checkpoint 都会失败






 

通过k8s 部署的flink 集群

2021-01-06 Thread 西
通过k8s-session部署的flink 集群,三个节点,job提交上去后为什么只占用一个task 
manager的节点。目前这样,导致数据倾斜,每次checkpoint 都会失败



使用k8s 实现flink 的高可用

2020-09-07 Thread 西
请教各位大佬,如果通过k8s session 实现flink的ha,如果有实现的了,请提供一下相应配置文件
根据官方文档,配置ha需要修改master,那么通过k8s 部署,如何修改master文件