Re: Pods are OOMKilled with RocksDB backend after a few checkpoints

2022-02-28 Thread Yun Tang
Hi Alex, Since current default checkpoint policy in RocksDB state backend is still full snapshot, which is actually creating savepoint format. Current savepoint would scan the whole RocksDB with iterators to write data out, and some intermediate data would be kept in memory. I think you could

Re:使用CUMULATE WINDOW 消费upsertkafka遇到的问题

2022-02-28 Thread 赵旭晨
sql如下: with effective_chargeorder as ( select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) and

使用CUMULATE WINDOW 消费upsertkafka遇到的问题

2022-02-28 Thread 赵旭晨
sql如下: with effective_chargeorder as ( select o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' ) and

Re: Flink failure rate restart not work as expect

2022-02-28 Thread Alexander Preuß
Hi, from a first glance it looks like the exception was thrown very rapidly so it exceeded the maxFailuresPerInterval and the FailureRestartStrategy decided not to restart. Why do you think this is different from the expected behavior? Best, Alex On Tue, Mar 1, 2022 at 3:23 AM 刘 家锹 wrote: >

Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 Thread 18703416...@163.com
首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource 示例代码如下: static class MySource { Long ts; String key; Object object; } env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { ctx.collect(new MySource()); }

Re: Flink FailureRateRestartStrategy策略异常

2022-02-28 Thread 胡伟华
Hi, 家锹 你应该使用了 pipelined region 的 Failover Strategy,这种模式下会将作业划分为多个 region,每次故障恢复只重启涉及到的 region. 单台机器故障时,如果多个 region 的 task 同时部署在这台机器上,那这些涉及到的 region 都会触发 failover,以至于达到了 FailureRateRestartStrategy 配置的重启上限,导致作业退出。 至于主动 kill TaskManager 作业重启而不退出,应该和 TaskManager 上运行的 Task 数量相关。

Re: pyflink object to java object

2022-02-28 Thread Francis Conroy
Hi Xingbo, I think that might work for me, I'll give it a try On Tue, 1 Mar 2022 at 15:06, Xingbo Huang wrote: > Hi, > With py4j, you can call any Java method. On how to create a Java Row, you > can call the `createRowWithNamedPositions` method of `RowUtils`[1]. > > [1] >

[no subject]

2022-02-28 Thread 谭 海棠
退订 获取 Outlook for iOS

Re: pyflink object to java object

2022-02-28 Thread Xingbo Huang
Hi, With py4j, you can call any Java method. On how to create a Java Row, you can call the `createRowWithNamedPositions` method of `RowUtils`[1]. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowUtils.java# Best, Xingbo Francis Conroy

退订

2022-02-28 Thread 谭 海棠
退订 获取 Outlook for iOS

退订

2022-02-28 Thread 谭 海棠
退订 获取 Outlook for iOS

Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 Thread Lei Wang
谢谢,这种是可以。 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink() 实际上逆序输出了窗口内的所有记录。 谢谢, 王磊 On Mon, Feb 28,

Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 Thread Lei Wang
谢谢,了解了。 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出: env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { return value; } }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

Re: keyBy 后的 getKey 函数调用了两次

2022-02-28 Thread yidan zhao
keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。 Lei Wang 于2022年3月1日周二 10:49写道: > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 > > env.addSource(consumer).keyBy(new KeySelector() { > @Override > public String getKey(String value) throws Exception

keyBy 后的 getKey 函数调用了两次

2022-02-28 Thread Lei Wang
接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 env.addSource(consumer).keyBy(new KeySelector() { @Override public String getKey(String value) throws Exception { System.out.println(value); return value; } }).addSink(new SinkTest(1)); 我自己做测试,每发送一条消息console 会打印两次,也就是

Flink FailureRateRestartStrategy策略异常

2022-02-28 Thread 刘 家锹
你好,伙伴们 我们最近碰到一个关于FailureRateRestartStrategy策略的问题,有点困惑。情况是这样子的: Flink版本:0.10.1 部署方式: on Yarn FailureRateRestartStrategy配置:failuresIntervalMS=6,backoffTimeMS=15000,maxFailuresPerInterval=4

Flink failure rate restart not work as expect

2022-02-28 Thread 刘 家锹
Hi, all We encounter some problem with FailureRateRestartStrategy, which confuse us and don't know how to solove it. Here's the situation: Flink version: 1.10.1 Development env: on Yarn FailureRateRestartStrategy: failuresIntervalMS=6,backoffTimeMS=15000,maxFailuresPerInterval=4 One of our

[no subject]

2022-02-28 Thread 谭 海棠
退订

Re:答复: 回复:如何从复杂的kafka消息体定义 table

2022-02-28 Thread mack143
退订 在 2021-07-09 10:06:19,"Chenzhiyuan(HR)" 写道: >消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: > >CREATE TABLE MyUserTable( >APPLY_PERSON_ID VARCHAR, >UPDATE_SALARY DECIMAL, >UP_AMOUNT DECIMAL, >CURRENCY VARCHAR, >EXCHANGE_RATE DECIMAL >) with ( >'connector.type' = 'kafka',

退订

2022-02-28 Thread mack143
退订

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 Thread yu'an huang
你好, 请问使用的Flink版本是哪一个呢,可以提供下完整的作业提交参数和运行环境吗? 看log猜测是某个UI请求而不是runtime内部出现了问题,确认下如果不访问UI中的作业拓扑图的话整个作业是否可以正常执行。 最后有一个建议把log作为邮件附件发送的话查看起来比较方便一点^_。 > On 28 Feb 2022, at 11:21 PM, xinzhuxiansheng wrote: > > 作业在HA或者单jobManager 写入kafka,消费kafka 都没问题 , 不过,Flink UI >

processwindowfunction output Iterator

2022-02-28 Thread HG
Hi, Can processwindowfunction output an Iterator? I need to sort and subtract timestamps from keyed events and then output them all with added elapsed times. Regards Hans

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 Thread 胡伟华
日志里没有 Dispatcher 、Job 的启动日志,请问作业是正常工作的吗? 如果作业不正常,可以检查下 leader 选主的逻辑,或者尝试去掉 HA,观察作业是否正常 > 2022年2月28日 下午8:57,xinzhuxiansheng 写道: > > 你是对的,在你的提醒下 ,我看了完整的 log,确实还有一些error log > > > > > Enabling required built-in plugins > Linking flink-s3-fs-hadoop-1.14.2.jar to plugin directory >

Re: Flink UI 经常打不开作业的拓扑图

2022-02-28 Thread 胡伟华
Hi, xinzhuxiansheng 这个异常日志看起来不是根本原因,方便提供下全量 JobManager 日志吗? > 2022年2月28日 下午7:50,xinzhuxiansheng 写道: > > 我的环境: > Flink on Native K8s Application mode,并且用S3协议来做HA部署, 部署后,作业正常工作,可Flink UI > 无法正常打开 作业的拓扑图,页面会提示内部错误 > > > jobmanager会提示很多 类似的异常: > 2022-02-28 08:46:21,171 ERROR >

Re: Kafka order info to MySQL discard middle status and guarantee final correctness

2022-02-28 Thread 胡伟华
Hi, Lei Wang I think you could use a custom filter, the logic is as follows: 1. If the status is a final status, then return true 2. If the status is a final status, perform random sampling return true > 2022年2月28日 上午10:16,Lei Wang 写道: > > Receive order message from kafka, every message has a

Flink UI 经常打不开作业的拓扑图

2022-02-28 Thread xinzhuxiansheng
我的环境: Flink on Native K8s Application mode,并且用S3协议来做HA部署, 部署后,作业正常工作,可Flink UI 无法正常打开 作业的拓扑图,页面会提示内部错误 jobmanager会提示很多 类似的异常: 2022-02-28 08:46:21,171 ERROR org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] - Unhandled exception. java.util.concurrent.CancellationException: null

Re: flink状态恢复

2022-02-28 Thread yidan zhao
从理论上来说,要实现你想要的效果,那就需要在start的时候指定上次停止的任务的jobId。 否则flink无法知道你期望基于哪个任务的最后一次检查点继续。反正都是一个参数,为啥不指定ckpt路径或者savepoint路径呢? Guo Thompson 于2022年2月28日周一 16:51写道: > 要指定savepoint的目录,不然不会从历史状态恢复 > > enginegun 于2022年2月28日周一 16:38写道: > > > flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复? >

flink-connector-jdbc sink mysql是否存在乱序问题

2022-02-28 Thread Guo Thompson
入口: [image: image.png] 批量处理: [image: image.png] 刷盘: executeBatch按理来讲就是mysql的一个事务。 [image: image.png] *疑惑:从flush中可以看到,底层是分开了两个executeBatch,举一个例子:* *kafka里面消息从flink-cdc通过debizium采集出来,对update的mysql操作会对应两条消息(op:d,op:c),这时候如果d和c两条消息在不同的executor中,在不同的executeBatch,会不会导致乱序?最终丢数据??*

Fwd: flink-connector-jdbc sink mysql是否会存在乱序问题

2022-02-28 Thread Guo Thompson
-- Forwarded message - 发件人: Guo Thompson Date: 2022年2月28日周一 16:39 Subject: flink-connector-jdbc sink mysql是否会存在乱序问题 To: user-zh 入口: [image: image.png] 批量处理: [image: image.png] 刷盘: executeBatch按理来讲就是mysql的一个事务。 [image: image.png] *疑惑:从flush中可以看到,底层是分开了两个executeBatch,举一个例子:*

Re: flink状态恢复

2022-02-28 Thread Guo Thompson
要指定savepoint的目录,不然不会从历史状态恢复 enginegun 于2022年2月28日周一 16:38写道: > flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复?

flink状态恢复

2022-02-28 Thread enginegun
flink 重启任务 进行状态恢复是否必须指定checkpoint目录,能否默认读取最后一次生成的checkpoint 进行状态恢复?