Re: 关于flink sql问题

2020-07-01 文章 forideal
Hi 本超



  关于Mysql 做维表,关掉cache后的优化手段,有什么建议吗?



比如,20k records per second 的流量,关掉 cache 会对 mysql 产生很大的压力。不知道 MySQL Lookup 做成 
async + batch 会不会提升性能或者有副作用。



Best forideal.







在 Benchao Li ,2020年7月1日 13:22写道:


我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。
如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证每次都是获取Mysql中最新的数据就可以了吧?

当然了,在DDL的时候并没有区分这个表是维表还是sink表,具体它是什么类型,只是根据你在SQL里面怎么使用来决定的。
理论上来讲,你一个DDL可以同时做维表也可以做Sink。(只是它们可能有些配置会不同,分开写两个DDL应该是更清晰一些)

zya  于2020年6月30日周二 下午11:26写道:

> 请问下,sink写出的表能做维表吗,因为sink会一直写入,做维表的话会一直动态变化
>
>
>
>
>
>  
>
>
>
>
> -- 原始邮件 --
> 发件人: "Benchao Li" 发送时间: 2020年6月30日(星期二) 晚上11:14
> 收件人: "user-zh"
> 主题: Re: 关于flink sql问题
>
>
>
> 应该做一个维表Join就可以了。
>
>
> zya 
> > Hi 各位,有个问题想请教一下:
> >     目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql,
> >
> >
>     在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗?
>
>
>
> --
>
> Best,
> Benchao Li



--

Best,
Benchao Li


Re: flink任务提交方式

2020-07-01 文章 Dream-底限
好的,感谢

On Thu, Jul 2, 2020 at 12:37 PM jianxu  wrote:

> 你可以看下这个项目https://github.com/todd5167/clusters-submiter,改造下应该满足你的需求。
> 在 2020-07-02 12:09:05,"Dream-底限"  写道:
> >hi
>
> >请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具
>


Re: flink的state过期设置

2020-07-01 文章 Yun Tang
Hi

TTL的时间戳实际是会存储在 state 里面 [1],与每个entry在一起,也就是说从Checkpoint恢复的话,数据里面的时间戳是当时插入时候的时间戳。

[1] 
https://github.com/apache/flink/blob/ba92b3b8b02e099c8aab4b2b23a37dca4558cabd/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L50

祝好
唐云


From: hdxg1101300...@163.com 
Sent: Thursday, July 2, 2020 11:17
To: user-zh 
Subject: flink的state过期设置

您好:
想咨询一下关于state的ttl问题;
想问一下 state设置的ttl,如果从checkpoints重启 ttl会不会失效;ttl针对的是process time,
比如我设置的7天过期,重新从checkpoints启动是第一次启动的时间算还是恢复时的新processtime算;他是state的一部分 还是怎么算;
   或者要注册定时器来实现



hdxg1101300...@163.com


Re:flink任务提交方式

2020-07-01 文章 jianxu
你可以看下这个项目https://github.com/todd5167/clusters-submiter,改造下应该满足你的需求。
在 2020-07-02 12:09:05,"Dream-底限"  写道:
>hi
>请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具


flink任务提交方式

2020-07-01 文章 Dream-底限
hi
请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具


Re: UDTAGGs sql的查询怎么写

2020-07-01 文章 liangji
好的,谢谢jinsong大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ????flink sql????

2020-07-01 文章 zya
blink-planner??temporal 
table??


sqlinsert into mysql_sink select C.log_id, C.vic from (select A.log_id, 
case when B.cnt>0 and A.server>0 then B.cnt+1 else A.server end as vic 
from (select log_id, server, PROCTIME() as proctime from kafka_source) A left 
join mysql_source for SYSTEM_TIME AS OF A.proctime AS B on A.log_id=B.log_id ) 
C group by log_id,vic 


??https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html


redis ?? temporal 
table??source??sink

~


 




--  --
??: "Benchao Li"

flink的state过期设置

2020-07-01 文章 hdxg1101300...@163.com
您好:
想咨询一下关于state的ttl问题;
想问一下 state设置的ttl,如果从checkpoints重启 ttl会不会失效;ttl针对的是process time,
比如我设置的7天过期,重新从checkpoints启动是第一次启动的时间算还是恢复时的新processtime算;他是state的一部分 还是怎么算;
   或者要注册定时器来实现



hdxg1101300...@163.com


Re: UDTAGGs sql的查询怎么写

2020-07-01 文章 Jingsong Li
Hi,

因为UDTAGGs不属于标准SQL的语法,所以只有TableApi

Best,
Jingsong

On Thu, Jul 2, 2020 at 11:10 AM liangji  wrote:

>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html#table-aggregation-functions
> 请问下UDTAGGs支持sql的写法吗,怎么写?看官档上只有table api的示例。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


UDTAGGs sql的查询怎么写

2020-07-01 文章 liangji
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html#table-aggregation-functions
请问下UDTAGGs支持sql的写法吗,怎么写?看官档上只有table api的示例。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state

2020-07-01 文章 liangji
我之前配置了HA,也配置了flink中yarn-attempts=2,结果是kill jm进程可以无限重启



--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi,jiliang1993:


??yarn??yarn.resourcemanager.am.max-attempts
yarn.application-attempt-failures-validity-interval??attempts10??10attempts??110??attempts??attemptsmin(yarnyarn.resourcemanager.am.max-attempts,flinkyarn.application-attempts)yarn??



Best,
MuChen.


--  --
??: "jiliang1993"https://blog.csdn.net/cndotaci/article/details/106870413 > 
??flinkyarn??2??6yarn??
 > >  > > 1. 
 > > 2. 
HAstate?? > > 
flink??1.10.0 > > flink-conf.yaml?? > $ grep -v ^# 
flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost > 
jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m > 
taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 
> parallelism.default: 1 high-availability: zookeeper > 
high-availability.storageDir: hdfs:///flink/ha/ > 
high-availability.zookeeper.quorum: > 
uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 > 
state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: > 
hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 > 
state.backend.incremental: true jobmanager.execution.failover-strategy: 
> region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ > 
historyserver.web.port: 8082 historyserver.archive.fs.dir: > 
hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1 
> # HA yarn.application-attempts: 2 > 
ssh??jm??kill > [root@uhadoop-op3raf-task48 ~]# 
jps 34785 YarnTaskExecutorRunner 16853 > YarnTaskExecutorRunner 17527 
PrestoServer 33289 YarnTaskExecutorRunner > 18026 
YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager > 
[root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 
> ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information 
> unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 
20412 > YarnJobClusterEntrypoint 39599 NodeManager 
[root@uhadoop-op3raf-task48 > ~]# kill -9 20412 
[root@uhadoop-op3raf-task48 ~]# jps 34785 > YarnTaskExecutorRunner 21926 
YarnJobClusterEntrypoint 23207 Jps 17527 > PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager > [root@uhadoop-op3raf-task48 
~]# kill -9 21926 [root@uhadoop-op3raf-task48 > ~]# jps 34785 
YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279 > Jps 17527 
PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager > 
[root@uhadoop-op3raf-task48 ~]# kill -9 23318

?????? flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi
??


Best,
MuChen.




--  --
??: ""https://blog.csdn.net/cndotaci/article/details/106870413
> 
??flinkyarn??2??6yarn??
>
> 
>
> 1. 
>
> 2. HAstate??
>
> flink??1.10.0
>
> flink-conf.yaml??
> $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m
> taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1 high-availability: zookeeper
> high-availability.storageDir: hdfs:///flink/ha/
> high-availability.zookeeper.quorum:
> uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1
> state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir:
> hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60
> state.backend.incremental: true jobmanager.execution.failover-strategy:
> region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/
> historyserver.web.port: 8082 historyserver.archive.fs.dir:
> hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1
> # HA yarn.application-attempts: 2
> ssh??jm??kill
> [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853
> YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner
> 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information
> unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412
> YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48
> ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785
> YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527
> PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279
> Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 23318

Re:回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-01 文章 程龙












都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空





在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
>你到具体的tm上找到相关的operator看看是不是有异常信息
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>在2020年07月01日 20:43,程龙 写道:
>flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
>
>
>java.lang.Exception: Could not perform checkpoint 3201 for operator Filter -> 
>Map (2/8).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
>   at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
>   at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>   at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>   at java.lang.Thread.run(Thread.java:745)
>Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)


回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-01 文章 JasonLee
你到具体的tm上找到相关的operator看看是不是有异常信息


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月01日 20:43,程龙 写道:
flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:


java.lang.Exception: Could not perform checkpoint 3201 for operator Filter -> 
Map (2/8).
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
   at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
   at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
   at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
   at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)

flink1.10 checkpoint 运行一段时间空指针异常

2020-07-01 文章 程龙
flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:


java.lang.Exception: Could not perform checkpoint 3201 for operator Filter -> 
Map (2/8).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803)

Re: flink基于yarn的HA次数无效,以及HA拉起的任务是否可以重用state

2020-07-01 文章 王松
hi, muchen
1. yarn.application-attempts
这个参数与另外一个参数有关系:yarn.application-attempt-failures-validity-interval,大概意思是需要在设置的这个interval内失败重试多少次,才认为flink
job是失败的,如果超过这个interval,就会重新开始计数。打个比方,yarn.application-attempts:
2,yarn.application-attempt-failures-validity-interval =
1(默认值,10s),只有在10s内 flink job 失败重启2次才会真正的失败。
2. 如果配置了checkpoint是会重用上次任务失败的state。

这是我个人的理解,有疑问大家一起讨论

MuChen <9329...@qq.com> 于2020年7月1日周三 下午7:50写道:

> hi,all:
>
> 我根据这篇博客https://blog.csdn.net/cndotaci/article/details/106870413
> 的介绍,配置了flink基于yarn的高可用,测试时发现配置的任务失败重试2次没有生效,我测试到第6次时,任务仍然能够被yarn拉起。
>
> 请问各位大佬
>
> 1. 下面配置中的重试次数为什么没有生效?
>
> 2. 通过HA拉起的任务,是否可以重用上次任务失败时的state?
>
> flink版本:1.10.0
>
> flink-conf.yaml配置:
> $ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m
> taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1 high-availability: zookeeper
> high-availability.storageDir: hdfs:///flink/ha/
> high-availability.zookeeper.quorum:
> uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1
> state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir:
> hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60
> state.backend.incremental: true jobmanager.execution.failover-strategy:
> region jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/
> historyserver.web.port: 8082 historyserver.archive.fs.dir:
> hdfs:///flink/flink-jobs/ historyserver.archive.fs.refresh-interval: 1
> # HA重试次数 yarn.application-attempts: 2
> ssh到jm节点,手动kill任务的操作日志:
> [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853
> YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner
> 18026 YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 16853 -- process information
> unavailable 17527 PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412
> YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48
> ~]# kill -9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785
> YarnTaskExecutorRunner 21926 YarnJobClusterEntrypoint 23207 Jps 17527
> PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 21926 [root@uhadoop-op3raf-task48
> ~]# jps 34785 YarnTaskExecutorRunner 23318 YarnJobClusterEntrypoint 26279
> Jps 17527 PrestoServer 33289 YarnTaskExecutorRunner 39599 NodeManager
> [root@uhadoop-op3raf-task48 ~]# kill -9 23318


flink????yarn??HA??????????????HA??????????????????????state

2020-07-01 文章 MuChen
hi??all??

??https://blog.csdn.net/cndotaci/article/details/106870413??flinkyarn??2??6yarn??



1. 

2. HAstate??

flink??1.10.0

flink-conf.yaml??
$ grep -v ^# flink-conf.yaml |grep -v ^$ jobmanager.rpc.address: localhost 
jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m 
taskmanager.memory.process.size: 1568m taskmanager.numberOfTaskSlots: 1 
parallelism.default: 1 high-availability: zookeeper 
high-availability.storageDir: hdfs:///flink/ha/ 
high-availability.zookeeper.quorum: 
uhadoop-op3raf-master1,uhadoop-op3raf-master2,uhadoop-op3raf-core1 
state.checkpoints.dir: hdfs:///flink/checkpoint state.savepoints.dir: 
hdfs:///flink/flink-savepoints state.checkpoints.num-retained:60 
state.backend.incremental: true jobmanager.execution.failover-strategy: region 
jobmanager.archive.fs.dir: hdfs:///flink/flink-jobs/ historyserver.web.port: 
8082 historyserver.archive.fs.dir: hdfs:///flink/flink-jobs/ 
historyserver.archive.fs.refresh-interval: 1 # HA 
yarn.application-attempts: 2 
ssh??jm??kill
[root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 16853 
YarnTaskExecutorRunner 17527 PrestoServer 33289 YarnTaskExecutorRunner 18026 
YarnJobClusterEntrypoint 20283 Jps 39599 NodeManager 
[root@uhadoop-op3raf-task48 ~]# kill -9 18026 [root@uhadoop-op3raf-task48 ~]# 
jps 34785 YarnTaskExecutorRunner 16853 -- process information unavailable 17527 
PrestoServer 21383 Jps 33289 YarnTaskExecutorRunner 20412 
YarnJobClusterEntrypoint 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 20412 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 21926 
YarnJobClusterEntrypoint 23207 Jps 17527 PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 21926 [root@uhadoop-op3raf-task48 ~]# jps 34785 YarnTaskExecutorRunner 23318 
YarnJobClusterEntrypoint 26279 Jps 17527 PrestoServer 33289 
YarnTaskExecutorRunner 39599 NodeManager [root@uhadoop-op3raf-task48 ~]# kill 
-9 23318

Re: flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2020-07-01 文章 Leonard Xu
Hello,

你试下 MD5(real_income_no) as rowkey 放在query的里层,最外层的group by直接用 rowkey试下, Flink 
1.11 之后支持在 table里声明 PK, 1.11后就不用推导了。

祝好,
Leonard Xu


> 在 2020年7月1日,13:51,tiantingting5...@163.com 写道:
> 
> MD5(real_income_no) as rowkey,



Re: 来自boss_大数据开发_史文龙的邮件

2020-07-01 文章 Leonard Xu
Hello

如果要取消订阅 FLink中文社区的邮件,直接发送任意内容的邮件到 user-zh 
@flink.apache.org 
 即可,邮件的取消和订阅可以参考[1]

祝好

[1] https://flink.apache.org/community.html#mailing-lists 


> 在 2020年7月1日,17:35,boss_大数据开发_史文龙  写道:
> 
> xxx-unsubscr...@flink.apache.org 


来自boss_大数据开发_史文龙的邮件

2020-07-01 文章 boss_大数据开发_史文龙
xxx-unsubscr...@flink.apache.org 

?????? flink sql ddl CREATE TABLE kafka011 sink ????????????exactly-once??

2020-07-01 文章 ????????
issues, Fix Version/s:None 
??dataStream


--  --
??: "??"https://issues.apache.org/jira/browse/FLINK-15221

 https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
> --
>  

Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 文章 方盛凯
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221

夏帅  于2020年7月1日周三 下午3:13写道:

> 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
>
> Kafka011TableSink
>
>
> @Override
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>return new FlinkKafkaProducer011<>(
>   topic,
>   new KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner,
>   FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
>   5);
> }
> 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase
>
> 参考:
> https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
> --
> 发件人:静谧雨寒 
> 发送时间:2020年7月1日(星期三) 14:33
> 收件人:user-zh 
> 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
>
>  flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql
> sink表使用两阶事务提交,exactly-once一致性保证 ?
> 官档说法:
> Consistency guarantees: By default, a Kafka sink ingests data with
> at-least-once guarantees into a Kafka topic if the query is executed with
> checkpointing enabled.,  
> CREATE TABLE 默认是 at-least-once
>
>


回复:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 文章 夏帅
你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once

Kafka011TableSink


@Override
protected SinkFunction createKafkaProducer(
  String topic,
  Properties properties,
  SerializationSchema serializationSchema,
  Optional> partitioner) {
   return new FlinkKafkaProducer011<>(
  topic,
  new KeyedSerializationSchemaWrapper<>(serializationSchema),
  properties,
  partitioner,
  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
  5);
}
如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase

参考: 
https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
--
发件人:静谧雨寒 
发送时间:2020年7月1日(星期三) 14:33
收件人:user-zh 
主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

 flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql 
sink表使用两阶事务提交,exactly-once一致性保证 ?
官档说法:
Consistency guarantees: By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
checkpointing enabled.,  
CREATE TABLE 默认是 at-least-once