Re: flink的算子没有类似于spark的cache操作吗?

2021-01-07 文章 张锴
保存中间变量可以用状态存

李继  于2021年1月7日周四 下午5:42写道:

> HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作
>
> val env = getBatchEnv
> val ds = env.fromElements("a","b","c")
>
> val ds2 = ds.map(x=>{
>   println("map op")
>   x.charAt(0).toInt+1
> })
>
> //此操作会打印三遍map op
> ds2.print()
>
> //此操作又会打印三遍map op
> ds2.filter(_>100).print()
>


Re: Re:sql-client配置hive启动不了

2021-01-07 文章 amenhub
好的,谢谢



 
发件人: Rui Li
发送时间: 2021-01-08 11:42
收件人: user-zh
主题: Re: 回复:sql-client配置hive启动不了
Hi,
 
用table api的话可以设置flink的security参数来指定principal和keytab [1]。
SQL client的模式试一下启动前手动做kinit行不行吧
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems
 
On Fri, Jan 8, 2021 at 10:06 AM amenhub  wrote:
 
> 啊?确实是带kerberos的hms,那请问有其他的解决办法吗
>
>
>
>
> 发件人: 叶贤勋
> 发送时间: 2021-01-08 10:03
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-client配置hive启动不了
> HMS是不是带kerberos认证的?
> 目前社区hive connector不支持访问Kerberos的HMS
>
>
>
>
> 在2021年01月7日 18:39,amenhub 写道:
> 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql
> client
>
> 报错信息:https://imgchr.com/i/smQrlj
>
> flink 版本,1.12
> hive 版本,3.1.0
>
>
>
>
 
-- 
Best regards!
Rui Li


Re: 在where条件中使用汉字导致查询出的字段出现unicode编码

2021-01-07 文章 spike
遇到了同样的问题,但是看没issue跟进



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.12.0 native k8s启动不了

2021-01-07 文章 yzxs
1、使用以下命令发布任务:
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
   
-Dkubernetes.container.image=registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
\
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.container-start-command-template="%java% %classpath%
%jvmmem% %jvmopts% %logging% %class% %args%" \
local:///opt/flink/usrlib/WordCount.jar

2、任务发布后,pod重启失败,用kubectl logs查看日志,出现以下错误:
/docker-entrypoint.sh: 125: exec: native-k8s: not found

3、检查了镜像的docker-entrypoint.sh脚本,没有navive-k8s的命令,镜像是基于flink最新的镜像进行构筑的,dockerfile如下:
FROM flink:latest
RUN mkdir -p /opt/flink/usrlib
COPY ./WordCount.jar /opt/flink/usrlib/WordCount.jar

3、pod的describe信息
Name: my-first-application-cluster-59c4445df4-4ss2m
Namespace:default
Priority: 0
Node: minikube/192.168.64.2
Start Time:   Wed, 23 Dec 2020 17:06:02 +0800
Labels:   app=my-first-application-cluster
  component=jobmanager
  pod-template-hash=59c4445df4
  type=flink-native-kubernetes
Annotations:  
Status:   Running
IP:   172.17.0.3
IPs:
  IP:   172.17.0.3
Controlled By:  ReplicaSet/my-first-application-cluster-59c4445df4
Containers:
  flink-job-manager:
Container ID: 
docker://b8e5759488af5fd3e3273f69d42890d9750d430cbd6e18b1d024ab83293d0124
Image: registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1
Image ID: 
docker-pullable://registry.cn-shenzhen.aliyuncs.com/syni_test/flink@sha256:53a2cec0d0a532aa5d79c241acfdd13accb9df78eb951eb4e878485174186aa8
Ports: 8081/TCP, 6123/TCP, 6124/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP
Command:
  /docker-entrypoint.sh
Args:
  native-k8s
  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
-Xms1073741824 -XX:MaxMetaspaceSize=268435456
-Dlog.file=/opt/flink/log/jobmanager.log
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
-D jobmanager.memory.off-heap.size=134217728b -D
jobmanager.memory.jvm-overhead.min=201326592b -D
jobmanager.memory.jvm-metaspace.size=268435456b -D
jobmanager.memory.heap.size=1073741824b -D
jobmanager.memory.jvm-overhead.max=201326592b
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Error
  Exit Code:127
  Started:  Wed, 23 Dec 2020 17:37:28 +0800
  Finished: Wed, 23 Dec 2020 17:37:28 +0800
Ready:  False
Restart Count:  11
Limits:
  cpu: 1
  memory:  1600Mi
Requests:
  cpu: 1
  memory:  1600Mi
Environment:
  _POD_IP_ADDRESS:   (v1:status.podIP)
Mounts:
  /opt/flink/conf from flink-config-volume (rw)
  /var/run/secrets/kubernetes.io/serviceaccount from default-token-9hdqt
(ro)
Conditions:
  Type  Status
  Initialized   True 
  Ready False 
  ContainersReady   False 
  PodScheduled  True 
Volumes:
  flink-config-volume:
Type:  ConfigMap (a volume populated by a ConfigMap)
Name:  flink-config-my-first-application-cluster
Optional:  false
  default-token-9hdqt:
Type:Secret (a volume populated by a Secret)
SecretName:  default-token-9hdqt
Optional:false
QoS Class:   Guaranteed
Node-Selectors:  
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type Reason Age  From   Message
   --     ---
  Normal   Scheduled  15d  default-scheduler  Successfully
assigned default/my-first-application-cluster-59c4445df4-4ss2m to minikube
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
513.7913ms
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
374.1125ms
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
360.6719ms
  Normal   Created15d (x4 over 15d)kubeletCreated
container flink-job-manager
  Normal   Started15d (x4 over 15d)kubeletStarted
container flink-job-manager
  Normal   Pulled 15d  kubeletSuccessfully
pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in
374.2637ms
  Normal   Pulling15d (x5 over 15d)kubeletPulling image
"registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1"
  Warning  BackOff15d (x141 

Re: 回复:sql-client配置hive启动不了

2021-01-07 文章 Rui Li
Hi,

用table api的话可以设置flink的security参数来指定principal和keytab [1]。
SQL client的模式试一下启动前手动做kinit行不行吧

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems

On Fri, Jan 8, 2021 at 10:06 AM amenhub  wrote:

> 啊?确实是带kerberos的hms,那请问有其他的解决办法吗
>
>
>
>
> 发件人: 叶贤勋
> 发送时间: 2021-01-08 10:03
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-client配置hive启动不了
> HMS是不是带kerberos认证的?
> 目前社区hive connector不支持访问Kerberos的HMS
>
>
>
>
> 在2021年01月7日 18:39,amenhub 写道:
> 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql
> client
>
> 报错信息:https://imgchr.com/i/smQrlj
>
> flink 版本,1.12
> hive 版本,3.1.0
>
>
>
>

-- 
Best regards!
Rui Li


Re: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container

2021-01-07 文章 Yun Tang
Hi,

有可能是堆外内存超用,可以参考最近中文社区的一篇投稿 《详解 Flink 容器化环境下的 OOM Killed》进行修改,建议先增大 jvm-overhead 
相关配置

[1] 
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247490197=1=b0893a9bf12fbcae76852a156302de95

祝好
唐云

From: Yang Peng 
Sent: Thursday, January 7, 2021 12:24
To: user-zh 
Subject: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. 
Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB 
virtual memory used. Killing container

Hi,

 
大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager
heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下:

Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address=flink-cm8.jd.163.org -Dweb.port=0
-Dweb.tmpdir=/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c
-Djobmanager.rpc.port=33656 -Drest.address=flink-cm8.jd.163.org
-Dsecurity.kerberos.login.keytab=/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab
|- 180362 180360 180362 180362 (bash) 0 2 116011008 353 /bin/bash -c
/usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608
-Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address='flink-cm8.jd.163.org' -Dweb.port='0'
-Dweb.tmpdir='/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c'
-Djobmanager.rpc.port='33656' -Drest.address='flink-cm8.jd.163.org'
-Dsecurity.kerberos.login.keytab='/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab'
1> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.out
2> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.err

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

2021-01-07 11:51:00,781 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
Source: 银河SDK原始日志 (18/90) (51ac2f29df472d001ce9b4307636ac1c) switched
from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@1aad00fa.
java.lang.Exception: Container
[pid=180362,containerID=container_e06_1603181034156_0137_01_02] is
running beyond physical memory limits. Current usage: 25.0 GB of 25 GB
physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D

Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 Yun Tang
因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。

另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。

祝好
唐云

From: bradyMk 
Sent: Thursday, January 7, 2021 16:38
To: user-zh@flink.apache.org 
Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

好的,我研究一下,谢谢指导~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: SQL作业的提交方式

2021-01-07 文章 林影
zeppelin 0.9 已经可以充当flink的job server角色了?
上次看jeff zhang在云栖大会说的是后面有这个规划,现在已经可以是用来?

Peihui He  于2021年1月8日周五 上午9:21写道:

> 可以尝试下zeppelin 0.9
> http://zeppelin.apache.org/
>
>
> jiangjiguang719  于2021年1月7日周四 下午8:34写道:
>
> > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> > 1、有没有更好的SQL作业的提交方式?
> > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
>


Fw:flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 文章 旧城以西












2021-01-0809:47:31,636INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint9of 
job 5e953fb772f9030c728e7c0498555ae2 expired before completing.
2021-01-0809:47:31,637INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_265]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




 转发邮件信息 
发件人:"旧城以西" 
发送日期:2021-01-08 10:12:45
收件人:"user-zh@flink.apache.org" 
主题:flink on k8s 提交job时如何指定taskmanager的个数

各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 

flink on k8s 提交job时如何指定taskmanager的个数

2021-01-07 文章 旧城以西
各位大佬好:
 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 
有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink
 on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。


flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125


flink jobmanger 配置


jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~


flink-taskmanager.yml


---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: kafka
spec:
  replicas: 3
  selector:
matchLabels:
  app: flink-taskmanager
  template:
metadata:
  labels:
app: flink-taskmanager
spec:
  containers:
  - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
  name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
  value: flink-jobmanager-svc
- name: TZ
  value: "Asia/Shanghai"
resources:
  requests:
cpu: 1200m
memory: 1024Mi
  limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
  mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
  mountPath: /opt/flink/lib
  volumes:
  - name: flink-taskmanager-pv
persistentVolumeClaim:
  claimName: flink-taskmanager-pvc
  - name: flink-jobmanager-lib-pv
persistentVolumeClaim:
  claimName: flink-jobmanager-lib-pvc
  imagePullSecrets:
- name: registrysecret


flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
org.apache.flink.api.common.time.Time.minutes(1)));


Re: 回复:sql-client配置hive启动不了

2021-01-07 文章 amenhub
啊?确实是带kerberos的hms,那请问有其他的解决办法吗



 
发件人: 叶贤勋
发送时间: 2021-01-08 10:03
收件人: user-zh@flink.apache.org
主题: 回复:sql-client配置hive启动不了
HMS是不是带kerberos认证的?
目前社区hive connector不支持访问Kerberos的HMS
 
 
 
 
在2021年01月7日 18:39,amenhub 写道:
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client
 
报错信息:https://imgchr.com/i/smQrlj
 
flink 版本,1.12
hive 版本,3.1.0
 
 
 


回复:sql-client配置hive启动不了

2021-01-07 文章 叶贤勋
HMS是不是带kerberos认证的?
目前社区hive connector不支持访问Kerberos的HMS




在2021年01月7日 18:39,amenhub 写道:
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client

报错信息:https://imgchr.com/i/smQrlj

flink 版本,1.12
hive 版本,3.1.0





Re: SQL作业的提交方式

2021-01-07 文章 LakeShen
我这边是底层其实有个 Flink Jar 任务,然后将 Flink SQL 代码以及作业相关配置参数,当做参数一起传入到底层
的 Flink Jar 中去,当然,现在也有很多其他的方式能够实现,也可以参考楼上的链接。

Best,
LakeShen

Peihui He  于2021年1月8日周五 上午9:21写道:

> 可以尝试下zeppelin 0.9
> http://zeppelin.apache.org/
>
>
> jiangjiguang719  于2021年1月7日周四 下午8:34写道:
>
> > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> > 1、有没有更好的SQL作业的提交方式?
> > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
>


Re: SQL作业的提交方式

2021-01-07 文章 Peihui He
可以尝试下zeppelin 0.9
http://zeppelin.apache.org/


jiangjiguang719  于2021年1月7日周四 下午8:34写道:

> 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> 1、有没有更好的SQL作业的提交方式?
> 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?


Re: SQL作业的提交方式

2021-01-07 文章 Sebastian Liu
可以尝试一下:https://github.com/ververica/flink-sql-gateway

如果有相关的需求,可以提issue

jiangjiguang719  于2021年1月7日周四 下午8:34写道:

> 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> 1、有没有更好的SQL作业的提交方式?
> 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?



-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Flink1.12怎么降级avro到1.8.2?

2021-01-07 文章 Dacheng
Hi,


大家好,


降级avro遇到的问题
在1.12官方文档里提到avro目前使用1.10,但是可以按需降级到1.8.2
https://ci.apache.org/projects/flink/flink-docs-release-1.12/release-notes/flink-1.12.html#upgrade-to-avro-version-1100-from-182-flink-18192
我这边在尝试降级时,遇到了下面的问题


1. 如果直接依赖1.8.2 会导致 flink-avro里的 AvroSchemaConverter#nullableSchema 报错
java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z
at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:448)
我对比了一下,Avro 1.8.2和1.10.0,发现在1.8.2版本里就是没有Schema.isNullable()这个方法


2. 为了解决问题 1,我尝试使用maven shade plugin,把org.apache.avro这个依赖进行relocate
pom为 https://paste.ubuntu.com/p/SMYHy66bc6/
但是报错
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-shade-plugin:3.2.0:shade (shade-deps) on project 
leyan-flink: Error creating shaded jar: Problem shading JAR 
/Users/dongzhi/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar
 entry 
META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
 org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class 
META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class:
 UnsupportedOperationException -> [Help 1]
[ERROR]
这个暂时没找到解决方案。因此求助社区


降级avro的原因  flink-sql集成hive时遇到了问题
另,我之前使用avro 1.10没问题
但是在尝试引入hive依赖,在SQL使用hive的内置函数时,依赖项为
flink-avro-confluent-registry-1.12.0
flink-connector-hive_2.11-1.12.0(provided)
hive-exec 2.3.4 (provided)
hadoop-mapreduce-client-core 2.6.0-cdh5.13.3 <= 引入这个依赖是因为遇到了和  
http://apache-flink.147419.n8.nabble.com/Flink-td7866.html  相同的问题
在flink/lib下的额外jar包为
flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar
hadoop-mapreduce-client-core-2.6.0-cdh5.13.3.jar
遇到了和 https://github.com/confluentinc/schema-registry/issues/1432 
类似的报错。因此尝试降级avro到1.8.2
java.lang.NoSuchFieldError: FACTORY
at org.apache.avro.Schemas.toString(Schemas.java:36) 
~[trade-event.jar:1.12.0] 
但是整个项目使用 mvn dependency:tree分析,没有任何地方使用 avro 1.8.2。
flink应用的发布方式是on-yarn per-job;线上CDH集群使用的avro版本比较旧,不知道是不是因为这个问题导致哪里使用了avro 1.8.2


谢谢

SQL作业的提交方式

2021-01-07 文章 jiangjiguang719
目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
1、有没有更好的SQL作业的提交方式?
2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?

Flink1.12触发保存点时失败

2021-01-07 文章 赵一旦
报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeLongFieldAccessorImpl.set(
UnsafeLongFieldAccessorImpl.java:80)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:409)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)


根据堆栈找到报错位置代码为:

try {
   for (int i = 0; i < numFields; i++) {
  boolean isNull = source.readBoolean();

  if (fields[i] != null) {
 if (isNull) {
fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
 } else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
 }
  } else if (!isNull) {
 // read and dump a pre-existing field value
 fieldSerializers[i].deserialize(source);
  }
   }
} catch (IllegalAccessException e) {
   throw new RuntimeException("Error during POJO copy, this should not
happen since we check the fields before.", e);
}


sql-client配置hive启动不了

2021-01-07 文章 amenhub
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql 
client

报错信息:https://imgchr.com/i/smQrlj

flink 版本,1.12
hive 版本,3.1.0





flink的算子没有类似于spark的cache操作吗?

2021-01-07 文章 李继
HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作

val env = getBatchEnv
val ds = env.fromElements("a","b","c")

val ds2 = ds.map(x=>{
  println("map op")
  x.charAt(0).toInt+1
})

//此操作会打印三遍map op
ds2.print()

//此操作又会打印三遍map op
ds2.filter(_>100).print()


Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 bradyMk
好的,我研究一下,谢谢指导~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 jiangjiguang719
1、是自研的
2、定时调度  
http://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API





在 2021-01-07 16:15:25,"bradyMk"  写道:
>Hi~
>请教一下:
>①您说的这个实时计算平台是你们自研的么?
>②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 bradyMk
Hi~Yun Tang大佬~

不是很理解为什么监控这个指标需要维护状态?该维护什么状态?怎么去维护呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 bradyMk
Hi~
请教一下:
①您说的这个实时计算平台是你们自研的么?
②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂该如何调用。。。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 bradyMk
Hi~
请教一下:
①您说的这个实时计算平台是你们自研的么?
②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/