flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-13 文章 李世钰
flink版本 flink1.11


flink sql连接kafka
create table kafka_table (
log_id string,
event_time bigint,
process_time as PROCTIME(),
ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
watermark for ts as ts - interval '1' second
) with (
'connector' = 'kafka',
'topic' = 'kafka_table',
'properties.bootstrap.servers' = '10.2.12.3:9092',
'properties.group.id' = 'tmp-log-consumer003',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)





使用窗口聚合的代码
val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL '10' 
SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' SECOND, 
INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group by 
HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' 
SECOND),kafka_table.src_ip")


相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
求问是什么原因不能触发窗口或者我的用法有什么问题吗

Re: 邮件退订

2020-11-13 文章 Shawn Huang
Hi,

退订需要发邮件到  user-zh-unsubscr...@flink.apache.org

可以参考 https://flink.apache.org/zh/community.html#section-1

Best,
Shawn Huang


wangleigis  于2020年11月14日周六 上午11:55写道:

>
>
>
>
>
>
> 退订
>
>
>
>
>
>
>
>
>
>
> --
>
> 祝:工作顺利,完事如意!


邮件退订

2020-11-13 文章 wangleigis






退订










--

祝:工作顺利,完事如意!

回复: Re:回复: flink-1.11.2 执行checkpoint失败

2020-11-13 文章 史 正超
谢谢回复,我看了下我的任务,是背压导致的checkpoint超时,超时是没有异常日志打印的,每超时一次 就打印 
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
这里的failure counter是 checkpoint的count。
任务是没有挂的,只是一直背压了。


发件人: hailongwang <18868816...@163.com>
发送时间: 2020年11月13日 6:13
收件人: user-zh@flink.apache.org 
主题: Re:回复: flink-1.11.2 执行checkpoint失败

据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。
我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启)
如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。

在 2020-11-13 09:13:34,"史 正超"  写道:
>这是个思路,谢谢回复,我先试下。
>
>发件人: 赵一旦 
>发送时间: 2020年11月13日 2:05
>收件人: user-zh@flink.apache.org 
>主题: Re: flink-1.11.2 执行checkpoint失败
>
>如果超时也是错误的话,推进你设置容忍数量为INT最大值。因为有时候检查点失败或超时并不真正代表反压,至少我的生产中是这样。
>有部分情况,压力高,但刚刚好的情况下,会出现部分检查点失败的case,但实际压力刚刚能顶住。没必要因此导致任务失败。
>
>史 正超  于2020年11月13日周五 上午10:01写道:
>
>> 从上面看是的。
>>
>> public void handleJobLevelCheckpointException(CheckpointException
>> exception, long checkpointId) {
>>checkFailureCounter(exception, checkpointId);
>>if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
>>   clearCount();
>>   failureCallback.failJob(new FlinkRuntimeException("Exceeded
>> checkpoint tolerable failure threshold."));
>>}
>> }
>>
>> 大于阈值就报那个错了。
>> 
>> 发件人: 赵一旦 
>> 发送时间: 2020年11月13日 1:56
>> 收件人: user-zh@flink.apache.org 
>> 主题: Re: flink-1.11.2 执行checkpoint失败
>>
>> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
>> failure threshold.
>> 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢?
>>
>> 史 正超  于2020年11月12日周四 下午9:23写道:
>>
>> > 执行checkpoint失败,报下面的错。
>> > 2020-11-12 21:04:56
>> > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
>> tolerable
>> > failure threshold.
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
>> > at
>> >
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > at
>> >
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> > at
>> >
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>>


Flink cdc 多表关联处理延迟很大

2020-11-13 文章 丁浩浩
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
有没有比较好的优化方案能缓解这样的问题?

Re:确认订阅user-zh@flink.apache.org

2020-11-13 文章 hailongwang
Hi,
   订阅中文用户邮件需发送至 user-zh-subscr...@flink.apache.org , 更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists


Best,
hailong
At 2020-11-13 17:12:40, "黑色"  wrote:


????????user-zh@flink.apache.org

2020-11-13 文章 ????


Re: pyflink 1.11 运行pyflink作业时报错

2020-11-13 文章 Dian Fu
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试?

> 在 2020年11月13日,下午5:06,whh_960101  写道:
> 
> Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 
> lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] 
>  No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 
> ,该怎么解决
> 
> 
> 



pyflink 1.11 运行pyflink作业时报错

2020-11-13 文章 whh_960101
Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 
lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]  
No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决



 

Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 文章 zhisheng
看完还是没有解决方案啊

JasonLee <17610775...@163.com> 于2020年11月13日周五 下午4:10写道:

> hi
> 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink本地编译源码失败

2020-11-13 文章 leiyanrui
修改为这个源之后可以编译下来吗 有人测过吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 文章 JasonLee
hi
可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-13 文章 Asahi Lee
BatchTableEnvironmenttable to dataset; dataset to table




----
??: 
   "user-zh"