flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口
flink版本 flink1.11 flink sql连接kafka create table kafka_table ( log_id string, event_time bigint, process_time as PROCTIME(), ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)), watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table', 'properties.bootstrap.servers' = '10.2.12.3:9092', 'properties.group.id' = 'tmp-log-consumer003', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) 使用窗口聚合的代码 val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' SECOND),kafka_table.src_ip") 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发, 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的 求问是什么原因不能触发窗口或者我的用法有什么问题吗
Re: 邮件退订
Hi, 退订需要发邮件到 user-zh-unsubscr...@flink.apache.org 可以参考 https://flink.apache.org/zh/community.html#section-1 Best, Shawn Huang wangleigis 于2020年11月14日周六 上午11:55写道: > > > > > > > 退订 > > > > > > > > > > > -- > > 祝:工作顺利,完事如意!
邮件退订
退订 -- 祝:工作顺利,完事如意!
回复: Re:回复: flink-1.11.2 执行checkpoint失败
谢谢回复,我看了下我的任务,是背压导致的checkpoint超时,超时是没有异常日志打印的,每超时一次 就打印 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 这里的failure counter是 checkpoint的count。 任务是没有挂的,只是一直背压了。 发件人: hailongwang <18868816...@163.com> 发送时间: 2020年11月13日 6:13 收件人: user-zh@flink.apache.org 主题: Re:回复: flink-1.11.2 执行checkpoint失败 据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。 我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启) 如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。 在 2020-11-13 09:13:34,"史 正超" 写道: >这是个思路,谢谢回复,我先试下。 > >发件人: 赵一旦 >发送时间: 2020年11月13日 2:05 >收件人: user-zh@flink.apache.org >主题: Re: flink-1.11.2 执行checkpoint失败 > >如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。 >有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。 > >史 正超 于2020年11月13日周五 上午10:01写道: > >> 从上面看是的。 >> >> public void handleJobLevelCheckpointException(CheckpointException >> exception, long checkpointId) { >>checkFailureCounter(exception, checkpointId); >>if (continuousFailureCounter.get() > tolerableCpFailureNumber) { >> clearCount(); >> failureCallback.failJob(new FlinkRuntimeException("Exceeded >> checkpoint tolerable failure threshold.")); >>} >> } >> >> 大于阈值就报那个错了。 >> >> 发件人: 赵一旦 >> 发送时间: 2020年11月13日 1:56 >> 收件人: user-zh@flink.apache.org >> 主题: Re: flink-1.11.2 执行checkpoint失败 >> >> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable >> failure threshold. >> 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢? >> >> 史 正超 于2020年11月12日周四 下午9:23写道: >> >> > 执行checkpoint失败,报下面的错。 >> > 2020-11-12 21:04:56 >> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint >> tolerable >> > failure threshold. >> > at >> > >> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) >> > at >> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) >> > at >> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) >> > at >> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) >> > at >> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) >> > at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > at >> > >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> > at >> > >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> > at java.lang.Thread.run(Thread.java:745) >> > >>
Flink cdc 多表关联处理延迟很大
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 有没有比较好的优化方案能缓解这样的问题?
Re:确认订阅user-zh@flink.apache.org
Hi, 订阅中文用户邮件需发送至 user-zh-subscr...@flink.apache.org , 更多详细情况可以参考[1] [1] https://flink.apache.org/community.html#mailing-lists Best, hailong At 2020-11-13 17:12:40, "黑色" wrote:
????????user-zh@flink.apache.org
Re: pyflink 1.11 运行pyflink作业时报错
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试? > 在 2020年11月13日,下午5:06,whh_960101 写道: > > Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 > lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] > No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 > ,该怎么解决 > > >
pyflink 1.11 运行pyflink作业时报错
Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决
Re: flink-1.11 使用 application 模式时 jobid 问题
看完还是没有解决方案啊 JasonLee <17610775...@163.com> 于2020年11月13日周五 下午4:10写道: > hi > 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink本地编译源码失败
修改为这个源之后可以编译下来吗 有人测过吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.11 使用 application 模式时 jobid 问题
hi 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????
BatchTableEnvironmenttable to dataset; dataset to table ---- ??: "user-zh"