Re: flink的算子没有类似于spark的cache操作吗?
保存中间变量可以用状态存 李继 于2021年1月7日周四 下午5:42写道: > HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作 > > val env = getBatchEnv > val ds = env.fromElements("a","b","c") > > val ds2 = ds.map(x=>{ > println("map op") > x.charAt(0).toInt+1 > }) > > //此操作会打印三遍map op > ds2.print() > > //此操作又会打印三遍map op > ds2.filter(_>100).print() >
Re: Re:sql-client配置hive启动不了
好的,谢谢 发件人: Rui Li 发送时间: 2021-01-08 11:42 收件人: user-zh 主题: Re: 回复:sql-client配置hive启动不了 Hi, 用table api的话可以设置flink的security参数来指定principal和keytab [1]。 SQL client的模式试一下启动前手动做kinit行不行吧 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems On Fri, Jan 8, 2021 at 10:06 AM amenhub wrote: > 啊?确实是带kerberos的hms,那请问有其他的解决办法吗 > > > > > 发件人: 叶贤勋 > 发送时间: 2021-01-08 10:03 > 收件人: user-zh@flink.apache.org > 主题: 回复:sql-client配置hive启动不了 > HMS是不是带kerberos认证的? > 目前社区hive connector不支持访问Kerberos的HMS > > > > > 在2021年01月7日 18:39,amenhub 写道: > 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql > client > > 报错信息:https://imgchr.com/i/smQrlj > > flink 版本,1.12 > hive 版本,3.1.0 > > > > -- Best regards! Rui Li
Re: 在where条件中使用汉字导致查询出的字段出现unicode编码
遇到了同样的问题,但是看没issue跟进 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink1.12.0 native k8s启动不了
1、使用以下命令发布任务: ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1 \ -Dkubernetes.container.image.pull-policy=Always \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \ local:///opt/flink/usrlib/WordCount.jar 2、任务发布后,pod重启失败,用kubectl logs查看日志,出现以下错误: /docker-entrypoint.sh: 125: exec: native-k8s: not found 3、检查了镜像的docker-entrypoint.sh脚本,没有navive-k8s的命令,镜像是基于flink最新的镜像进行构筑的,dockerfile如下: FROM flink:latest RUN mkdir -p /opt/flink/usrlib COPY ./WordCount.jar /opt/flink/usrlib/WordCount.jar 3、pod的describe信息 Name: my-first-application-cluster-59c4445df4-4ss2m Namespace:default Priority: 0 Node: minikube/192.168.64.2 Start Time: Wed, 23 Dec 2020 17:06:02 +0800 Labels: app=my-first-application-cluster component=jobmanager pod-template-hash=59c4445df4 type=flink-native-kubernetes Annotations: Status: Running IP: 172.17.0.3 IPs: IP: 172.17.0.3 Controlled By: ReplicaSet/my-first-application-cluster-59c4445df4 Containers: flink-job-manager: Container ID: docker://b8e5759488af5fd3e3273f69d42890d9750d430cbd6e18b1d024ab83293d0124 Image: registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1 Image ID: docker-pullable://registry.cn-shenzhen.aliyuncs.com/syni_test/flink@sha256:53a2cec0d0a532aa5d79c241acfdd13accb9df78eb951eb4e878485174186aa8 Ports: 8081/TCP, 6123/TCP, 6124/TCP Host Ports:0/TCP, 0/TCP, 0/TCP Command: /docker-entrypoint.sh Args: native-k8s $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b State: Waiting Reason: CrashLoopBackOff Last State: Terminated Reason: Error Exit Code:127 Started: Wed, 23 Dec 2020 17:37:28 +0800 Finished: Wed, 23 Dec 2020 17:37:28 +0800 Ready: False Restart Count: 11 Limits: cpu: 1 memory: 1600Mi Requests: cpu: 1 memory: 1600Mi Environment: _POD_IP_ADDRESS: (v1:status.podIP) Mounts: /opt/flink/conf from flink-config-volume (rw) /var/run/secrets/kubernetes.io/serviceaccount from default-token-9hdqt (ro) Conditions: Type Status Initialized True Ready False ContainersReady False PodScheduled True Volumes: flink-config-volume: Type: ConfigMap (a volume populated by a ConfigMap) Name: flink-config-my-first-application-cluster Optional: false default-token-9hdqt: Type:Secret (a volume populated by a Secret) SecretName: default-token-9hdqt Optional:false QoS Class: Guaranteed Node-Selectors: Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s node.kubernetes.io/unreachable:NoExecute op=Exists for 300s Events: Type Reason Age From Message -- --- Normal Scheduled 15d default-scheduler Successfully assigned default/my-first-application-cluster-59c4445df4-4ss2m to minikube Normal Pulled 15d kubeletSuccessfully pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in 513.7913ms Normal Pulled 15d kubeletSuccessfully pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in 374.1125ms Normal Pulled 15d kubeletSuccessfully pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in 360.6719ms Normal Created15d (x4 over 15d)kubeletCreated container flink-job-manager Normal Started15d (x4 over 15d)kubeletStarted container flink-job-manager Normal Pulled 15d kubeletSuccessfully pulled image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" in 374.2637ms Normal Pulling15d (x5 over 15d)kubeletPulling image "registry.cn-shenzhen.aliyuncs.com/syni_test/flink:v1" Warning BackOff15d (x141
Re: 回复:sql-client配置hive启动不了
Hi, 用table api的话可以设置flink的security参数来指定principal和keytab [1]。 SQL client的模式试一下启动前手动做kinit行不行吧 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems On Fri, Jan 8, 2021 at 10:06 AM amenhub wrote: > 啊?确实是带kerberos的hms,那请问有其他的解决办法吗 > > > > > 发件人: 叶贤勋 > 发送时间: 2021-01-08 10:03 > 收件人: user-zh@flink.apache.org > 主题: 回复:sql-client配置hive启动不了 > HMS是不是带kerberos认证的? > 目前社区hive connector不支持访问Kerberos的HMS > > > > > 在2021年01月7日 18:39,amenhub 写道: > 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql > client > > 报错信息:https://imgchr.com/i/smQrlj > > flink 版本,1.12 > hive 版本,3.1.0 > > > > -- Best regards! Rui Li
Re: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container
Hi, 有可能是堆外内存超用,可以参考最近中文社区的一篇投稿 《详解 Flink 容器化环境下的 OOM Killed》进行修改,建议先增大 jvm-overhead 相关配置 [1] https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247490197=1=b0893a9bf12fbcae76852a156302de95 祝好 唐云 From: Yang Peng Sent: Thursday, January 7, 2021 12:24 To: user-zh Subject: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container Hi, 大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下: Dump of the process-tree for container_e06_1603181034156_0137_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792 6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=1073741824b -D taskmanager.memory.network.min=1073741824b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=12750684160b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=11408506880b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=flink-cm8.jd.163.org -Dweb.port=0 -Dweb.tmpdir=/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c -Djobmanager.rpc.port=33656 -Drest.address=flink-cm8.jd.163.org -Dsecurity.kerberos.login.keytab=/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab |- 180362 180360 180362 180362 (bash) 0 2 116011008 353 /bin/bash -c /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=1073741824b -D taskmanager.memory.network.min=1073741824b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=12750684160b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=11408506880b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address='flink-cm8.jd.163.org' -Dweb.port='0' -Dweb.tmpdir='/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c' -Djobmanager.rpc.port='33656' -Drest.address='flink-cm8.jd.163.org' -Dsecurity.kerberos.login.keytab='/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab' 1> /mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.out 2> /mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.err Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 2021-01-07 11:51:00,781 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: 银河SDK原始日志 (18/90) (51ac2f29df472d001ce9b4307636ac1c) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@1aad00fa. java.lang.Exception: Container [pid=180362,containerID=container_e06_1603181034156_0137_01_02] is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container. Dump of the process-tree for container_e06_1603181034156_0137_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792 6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D
Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。 另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。 祝好 唐云 From: bradyMk Sent: Thursday, January 7, 2021 16:38 To: user-zh@flink.apache.org Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题 好的,我研究一下,谢谢指导~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: SQL作业的提交方式
zeppelin 0.9 已经可以充当flink的job server角色了? 上次看jeff zhang在云栖大会说的是后面有这个规划,现在已经可以是用来? Peihui He 于2021年1月8日周五 上午9:21写道: > 可以尝试下zeppelin 0.9 > http://zeppelin.apache.org/ > > > jiangjiguang719 于2021年1月7日周四 下午8:34写道: > > > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > > 1、有没有更好的SQL作业的提交方式? > > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用? >
Fw:flink on k8s 提交job时如何指定taskmanager的个数
2021-01-0809:47:31,636INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint9of job 5e953fb772f9030c728e7c0498555ae2 expired before completing. 2021-01-0809:47:31,637INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_265] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_265] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_265] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_265] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_265] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] 转发邮件信息 发件人:"旧城以西" 发送日期:2021-01-08 10:12:45 收件人:"user-zh@flink.apache.org" 主题:flink on k8s 提交job时如何指定taskmanager的个数 各位大佬好: 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。 flink taskmanager的配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 8 parallelism.default: 1 high-availability: zookeeper high-availability.cluster-id: 1 high-availability.storageDir: file:///tmp high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true classloader.resolve-order: parent-first blob.server.port: 6124 query.server.port: 6125 flink jobmanger 配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.zookeeper.path.root: /flink high-availability.cluster-id: 1 high-availability.storageDir: file:///data/ha high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true high-availability.jobmanager.port: 6123 state.backend: filesystem state.backend.fs.checkpointdir: file:///data/state web.upload.dir: /data/upload blob.server.port: 6124 metrics.internal.query-service.port: 6125 classloader.resolve-order: parent-first ~ flink-taskmanager.yml --- apiVersion: apps/v1beta1 kind: Deployment metadata: name: flink-taskmanager namespace: kafka spec: replicas: 3 selector: matchLabels: app: flink-taskmanager template: metadata: labels: app: flink-taskmanager spec: containers: - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8 name: flink-taskmanager ports: - containerPort: 8081 name: flink-task workingDir: /opt/flink args: - taskmanager env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager-svc - name: TZ value: "Asia/Shanghai" resources: requests: cpu: 1200m memory: 1024Mi limits: cpu: 2000m memory: 2048Mi volumeMounts: - name: flink-taskmanager-pv mountPath: /opt/flink/conf - name: flink-jobmanager-lib-pv mountPath: /opt/flink/lib volumes: - name: flink-taskmanager-pv persistentVolumeClaim: claimName: flink-taskmanager-pvc - name: flink-jobmanager-lib-pv persistentVolumeClaim: claimName: flink-jobmanager-lib-pvc imagePullSecrets: - name: registrysecret flink 代码 // 获取flink 代码运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定checkpoint env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //
flink on k8s 提交job时如何指定taskmanager的个数
各位大佬好: 目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。 flink taskmanager的配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 8 parallelism.default: 1 high-availability: zookeeper high-availability.cluster-id: 1 high-availability.storageDir: file:///tmp high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true classloader.resolve-order: parent-first blob.server.port: 6124 query.server.port: 6125 flink jobmanger 配置 jobmanager.rpc.address: flink-jobmanager-svc jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.zookeeper.path.root: /flink high-availability.cluster-id: 1 high-availability.storageDir: file:///data/ha high-availability.zookeeper.quorum: zookeeper:2181 zookeeper.sasl.disable: true high-availability.jobmanager.port: 6123 state.backend: filesystem state.backend.fs.checkpointdir: file:///data/state web.upload.dir: /data/upload blob.server.port: 6124 metrics.internal.query-service.port: 6125 classloader.resolve-order: parent-first ~ flink-taskmanager.yml --- apiVersion: apps/v1beta1 kind: Deployment metadata: name: flink-taskmanager namespace: kafka spec: replicas: 3 selector: matchLabels: app: flink-taskmanager template: metadata: labels: app: flink-taskmanager spec: containers: - image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8 name: flink-taskmanager ports: - containerPort: 8081 name: flink-task workingDir: /opt/flink args: - taskmanager env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-jobmanager-svc - name: TZ value: "Asia/Shanghai" resources: requests: cpu: 1200m memory: 1024Mi limits: cpu: 2000m memory: 2048Mi volumeMounts: - name: flink-taskmanager-pv mountPath: /opt/flink/conf - name: flink-jobmanager-lib-pv mountPath: /opt/flink/lib volumes: - name: flink-taskmanager-pv persistentVolumeClaim: claimName: flink-taskmanager-pvc - name: flink-jobmanager-lib-pv persistentVolumeClaim: claimName: flink-jobmanager-lib-pvc imagePullSecrets: - name: registrysecret flink 代码 // 获取flink 代码运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 指定checkpoint env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 指定checkpoint 的时间 env.enableCheckpointing(1000*60); // 指定生成水印的时间间隔 env.getConfig().setAutoWatermarkInterval(100); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.minutes(1)));
Re: 回复:sql-client配置hive启动不了
啊?确实是带kerberos的hms,那请问有其他的解决办法吗 发件人: 叶贤勋 发送时间: 2021-01-08 10:03 收件人: user-zh@flink.apache.org 主题: 回复:sql-client配置hive启动不了 HMS是不是带kerberos认证的? 目前社区hive connector不支持访问Kerberos的HMS 在2021年01月7日 18:39,amenhub 写道: 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql client 报错信息:https://imgchr.com/i/smQrlj flink 版本,1.12 hive 版本,3.1.0
回复:sql-client配置hive启动不了
HMS是不是带kerberos认证的? 目前社区hive connector不支持访问Kerberos的HMS 在2021年01月7日 18:39,amenhub 写道: 在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql client 报错信息:https://imgchr.com/i/smQrlj flink 版本,1.12 hive 版本,3.1.0
Re: SQL作业的提交方式
我这边是底层其实有个 Flink Jar 任务,然后将 Flink SQL 代码以及作业相关配置参数,当做参数一起传入到底层 的 Flink Jar 中去,当然,现在也有很多其他的方式能够实现,也可以参考楼上的链接。 Best, LakeShen Peihui He 于2021年1月8日周五 上午9:21写道: > 可以尝试下zeppelin 0.9 > http://zeppelin.apache.org/ > > > jiangjiguang719 于2021年1月7日周四 下午8:34写道: > > > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > > 1、有没有更好的SQL作业的提交方式? > > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用? >
Re: SQL作业的提交方式
可以尝试下zeppelin 0.9 http://zeppelin.apache.org/ jiangjiguang719 于2021年1月7日周四 下午8:34写道: > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > 1、有没有更好的SQL作业的提交方式? > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
Re: SQL作业的提交方式
可以尝试一下:https://github.com/ververica/flink-sql-gateway 如果有相关的需求,可以提issue jiangjiguang719 于2021年1月7日周四 下午8:34写道: > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > 1、有没有更好的SQL作业的提交方式? > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用? -- *With kind regards Sebastian Liu 刘洋 Institute of Computing Technology, Chinese Academy of Science Mobile\WeChat: +86—15201613655 E-mail: liuyang0...@gmail.com QQ: 3239559*
Flink1.12怎么降级avro到1.8.2?
Hi, 大家好, 降级avro遇到的问题 在1.12官方文档里提到avro目前使用1.10,但是可以按需降级到1.8.2 https://ci.apache.org/projects/flink/flink-docs-release-1.12/release-notes/flink-1.12.html#upgrade-to-avro-version-1100-from-182-flink-18192 我这边在尝试降级时,遇到了下面的问题 1. 如果直接依赖1.8.2 会导致 flink-avro里的 AvroSchemaConverter#nullableSchema 报错 java.lang.NoSuchMethodError: org.apache.avro.Schema.isNullable()Z at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.nullableSchema(AvroSchemaConverter.java:448) 我对比了一下,Avro 1.8.2和1.10.0,发现在1.8.2版本里就是没有Schema.isNullable()这个方法 2. 为了解决问题 1,我尝试使用maven shade plugin,把org.apache.avro这个依赖进行relocate pom为 https://paste.ubuntu.com/p/SMYHy66bc6/ 但是报错 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.2.0:shade (shade-deps) on project leyan-flink: Error creating shaded jar: Problem shading JAR /Users/dongzhi/.m2/repository/org/glassfish/jersey/core/jersey-common/2.30/jersey-common-2.30.jar entry META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class: org.apache.maven.plugin.MojoExecutionException: Error in ASM processing class META-INF/versions/11/org/glassfish/jersey/internal/jsr166/SubmissionPublisher$1.class: UnsupportedOperationException -> [Help 1] [ERROR] 这个暂时没找到解决方案。因此求助社区 降级avro的原因 flink-sql集成hive时遇到了问题 另,我之前使用avro 1.10没问题 但是在尝试引入hive依赖,在SQL使用hive的内置函数时,依赖项为 flink-avro-confluent-registry-1.12.0 flink-connector-hive_2.11-1.12.0(provided) hive-exec 2.3.4 (provided) hadoop-mapreduce-client-core 2.6.0-cdh5.13.3 <= 引入这个依赖是因为遇到了和 http://apache-flink.147419.n8.nabble.com/Flink-td7866.html 相同的问题 在flink/lib下的额外jar包为 flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar hadoop-mapreduce-client-core-2.6.0-cdh5.13.3.jar 遇到了和 https://github.com/confluentinc/schema-registry/issues/1432 类似的报错。因此尝试降级avro到1.8.2 java.lang.NoSuchFieldError: FACTORY at org.apache.avro.Schemas.toString(Schemas.java:36) ~[trade-event.jar:1.12.0] 但是整个项目使用 mvn dependency:tree分析,没有任何地方使用 avro 1.8.2。 flink应用的发布方式是on-yarn per-job;线上CDH集群使用的avro版本比较旧,不知道是不是因为这个问题导致哪里使用了avro 1.8.2 谢谢
SQL作业的提交方式
目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: 1、有没有更好的SQL作业的提交方式? 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
Flink1.12触发保存点时失败
报错信息如下: java.lang.IllegalArgumentException: Can not set long field com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException( UnsafeFieldAccessorImpl.java:167) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException( UnsafeFieldAccessorImpl.java:171) at sun.reflect.UnsafeLongFieldAccessorImpl.set( UnsafeLongFieldAccessorImpl.java:80) at java.lang.reflect.Field.set(Field.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer .deserialize(PojoSerializer.java:409) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:202) at org.apache.flink.streaming.runtime.streamrecord. StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate .read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization. SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( SpillingAdaptiveSpanningRecordDeserializer.java:92) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:145) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor .processInput(StreamTwoInputProcessor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:372) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:575) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:539) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) 根据堆栈找到报错位置代码为: try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); if (fields[i] != null) { if (isNull) { fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。 } else { Object field = fieldSerializers[i].deserialize(source); fields[i].set(target, field); } } else if (!isNull) { // read and dump a pre-existing field value fieldSerializers[i].deserialize(source); } } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); }
sql-client配置hive启动不了
在sql-client-defaults.yaml配置hive catalog信息后,无法通过sql-client.sh embedded启动sql client 报错信息:https://imgchr.com/i/smQrlj flink 版本,1.12 hive 版本,3.1.0
flink的算子没有类似于spark的cache操作吗?
HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作 val env = getBatchEnv val ds = env.fromElements("a","b","c") val ds2 = ds.map(x=>{ println("map op") x.charAt(0).toInt+1 }) //此操作会打印三遍map op ds2.print() //此操作又会打印三遍map op ds2.filter(_>100).print()
Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
好的,我研究一下,谢谢指导~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
1、是自研的 2、定时调度 http://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API 在 2021-01-07 16:15:25,"bradyMk" 写道: >Hi~ >请教一下: >①您说的这个实时计算平台是你们自研的么? >②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。 > > > >- >Best Wishes >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题
Hi~Yun Tang大佬~ 不是很理解为什么监控这个指标需要维护状态?该维护什么状态?怎么去维护呢? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
Hi~ 请教一下: ①您说的这个实时计算平台是你们自研的么? ②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂该如何调用。。。 - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题
Hi~ 请教一下: ①您说的这个实时计算平台是你们自研的么? ②每分钟调用yarn的rest api 获取作业状态,您是怎么实现的呢?是定时脚本么?我没调用过yarn的rest api ,还不懂改如何调用。。。 - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/