Re:使用CUMULATE WINDOW 消费upsertkafka遇到的问题

2022-02-28 文章 赵旭晨
sql如下: with effective_chargeorder as ( select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) and o.

使用CUMULATE WINDOW 消费upsertkafka遇到的问题

2022-02-28 文章 赵旭晨
sql如下: with effective_chargeorder as ( select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) and o.

回复: Flink FailureRateRestartStrategy策略异常

2022-02-28 文章 刘 家锹
是的,我们用的pipelined region的Failover策略,region切分这块没找到具体规则的文档。 但我们作业只有一个taskmanager,同时并行度为6,逻辑是从kafka消费处理后写到下游kafka。从日志以及拓扑中来看,猜测我们的作业应该只能按并行度切分成6个,故障的时候就直接超过了。 不过还有点疑问,希望可以解答下哈 1. 对于这种FailoverStrategy+pipeline region的,best practice应该要如何比较好 2. 测试主动kill taskmanager的,因为只有一个taskmanager,预期行为是否应该也是超过重

Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 文章 18703416...@163.com
首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource 示例代码如下: static class MySource { Long ts; String key; Object object; } env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { ctx.collect(new MySource()); } @Overrid

Re: Flink FailureRateRestartStrategy策略异常

2022-02-28 文章 胡伟华
Hi, 家锹 你应该使用了 pipelined region 的 Failover Strategy,这种模式下会将作业划分为多个 region,每次故障恢复只重启涉及到的 region. 单台机器故障时,如果多个 region 的 task 同时部署在这台机器上,那这些涉及到的 region 都会触发 failover,以至于达到了 FailureRateRestartStrategy 配置的重启上限,导致作业退出。 至于主动 kill TaskManager 作业重启而不退出,应该和 TaskManager 上运行的 Task 数量相关。 可以参考社区文档:https://n

退订

2022-02-28 文章 谭 海棠
退订 获取 Outlook for iOS

Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 文章 Lei Wang
谢谢,这种是可以。 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink() 实际上逆序输出了窗口内的所有记录。 谢谢, 王磊 On Mon, Feb 28,

Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 文章 Lei Wang
谢谢,了解了。 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1)); 上面的代码我测试了

Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 文章 yidan zhao
keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。 Lei Wang 于2022年3月1日周二 10:49写道: > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 > > env.addSource(consumer).keyBy(new KeySelector() { > @Override > public String getKey(String value) throws Exception

keyBy 后的 getKey 函数调用了两次

2022-02-28 文章 Lei Wang
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { System.out.println(value); return value; } }).addSink(new SinkTest(1)); 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.p

Flink FailureRateRestartStrategy策略异常

2022-02-28 文章 刘 家锹
你好,伙伴们 我们最近碰到一个关于FailureRateRestartStrategy策略的问题,有点困惑。情况是这样子的: Flink版本:0.10.1 部署方式: on Yarn FailureRateRestartStrategy配置:failuresIntervalMS=6,backoffTimeMS=15000,maxFailuresPerInterval=4 当时我们hadoop集群的一台机器假死卡住,而Flink任务的TaskManager就运行在这台机器上。机器故障时,JobManager收到了heartbeat超时异常,从日志上看是连续抛出的4次超时异常(每个异

Re:答复: 回复:如何从复杂的kafka消息体定义 table

2022-02-28 文章 mack143
退订 在 2021-07-09 10:06:19,"Chenzhiyuan(HR)" 写道: >消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: > >CREATE TABLE MyUserTable( >APPLY_PERSON_ID VARCHAR, >UPDATE_SALARY DECIMAL, >UP_AMOUNT DECIMAL, >CURRENCY VARCHAR, >EXCHANGE_RATE DECIMAL >) with ( >'connector.type' = 'kafka', >'connector

退订

2022-02-28 文章 mack143
退订

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 文章 yu'an huang
你好, 请问使用的Flink版本是哪一个呢,可以提供下完整的作业提交参数和运行环境吗? 看log猜测是某个UI请求而不是runtime内部出现了问题,确认下如果不访问UI中的作业拓扑图的话整个作业是否可以正常执行。 最后有一个建议把log作为邮件附件发送的话查看起来比较方便一点^_。 > On 28 Feb 2022, at 11:21 PM, xinzhuxiansheng wrote: > > 作业在HA或者单jobManager 写入kafka,消费kafka 都没问题 , 不过,Flink UI > 出现拓扑图打不开的情况只在HA模式下有,单jobManag

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 文章 胡伟华
日志里没有 Dispatcher 、Job 的启动日志,请问作业是正常工作的吗? 如果作业不正常,可以检查下 leader 选主的逻辑,或者尝试去掉 HA,观察作业是否正常 > 2022年2月28日 下午8:57,xinzhuxiansheng 写道: > > 你是对的,在你的提醒下 ,我看了完整的 log,确实还有一些error log > > > > > Enabling required built-in plugins > Linking flink-s3-fs-hadoop-1.14.2.jar to plugin directory > Successfull

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 文章 胡伟华
Hi, xinzhuxiansheng 这个异常日志看起来不是根本原因,方便提供下全量 JobManager 日志吗? > 2022年2月28日 下午7:50,xinzhuxiansheng 写道: > > 我的环境: > Flink on Native K8s Application mode,并且用S3协议来做HA部署, 部署后,作业正常工作,可Flink UI > 无法正常打开 作业的拓扑图,页面会提示内部错误 > > > jobmanager会提示很多 类似的异常: > 2022-02-28 08:46:21,171 ERROR > org.apache.flink

Flink UI 经常打不开作业的拓扑图

2022-02-28 文章 xinzhuxiansheng
我的环境: Flink on Native K8s Application mode,并且用S3协议来做HA部署, 部署后,作业正常工作,可Flink UI 无法正常打开 作业的拓扑图,页面会提示内部错误 jobmanager会提示很多 类似的异常: 2022-02-28 08:46:21,171 ERROR org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled exception. java.util.concurrent.CancellationException: null

Re: flink状态恢复

2022-02-28 文章 yidan zhao
从理论上来说,要实现你想要的效果,那就需要在start的时候指定上次停止的任务的jobId。 否则flink无法知道你期望基于哪个任务的最后一次检查点继续。反正都是一个参数,为啥不指定ckpt路径或者savepoint路径呢? Guo Thompson 于2022年2月28日周一 16:51写道: > 要指定savepoint的目录,不然不会从历史状态恢复 > > enginegun 于2022年2月28日周一 16:38写道: > > > flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复? >

flink-connector-jdbc sink mysql是否存在乱序问题

2022-02-28 文章 Guo Thompson
入口: [image: image.png] 批量处理: [image: image.png] 刷盘: executeBatch按理来讲就是mysql的一个事务。 [image: image.png] *疑惑:从flush中可以看到,底层是分开了两个executeBatch,举一个例子:* *kafka里面消息从flink-cdc通过debizium采集出来,对update的mysql操作会对应两条消息(op:d,op:c),这时候如果d和c两条消息在不同的executor中,在不同的executeBatch,会不会导致乱序?最终丢数据??*

Fwd: flink-connector-jdbc sink mysql是否会存在乱序问题

2022-02-28 文章 Guo Thompson
-- Forwarded message - 发件人: Guo Thompson Date: 2022年2月28日周一 16:39 Subject: flink-connector-jdbc sink mysql是否会存在乱序问题 To: user-zh 入口: [image: image.png] 批量处理: [image: image.png] 刷盘: executeBatch按理来讲就是mysql的一个事务。 [image: image.png] *疑惑:从flush中可以看到,底层是分开了两个executeBatch,举一个例子:* *kaf

Re: flink状态恢复

2022-02-28 文章 Guo Thompson
要指定savepoint的目录,不然不会从历史状态恢复 enginegun 于2022年2月28日周一 16:38写道: > flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复?

flink状态恢复

2022-02-28 文章 enginegun
flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复?