Re: Re:回复:flink 从mysql读取数据异常

2021-03-30 Thread Robin Zhang
Hi,air23 JDBCTableSource就是batch模式的,不走实时。Flink解析执行计划时内部会去判断。 Best air23 wrote > 这边是想离线读取。不是走实时的 > 看到异常是 Only insert statement is supported now > > > > > > > > > > > > > > > > > > 在 2021-03-30 10:31:51,"guoyb" < > 861277329@ >> 写道: >>可以读取的,还有内置flink cdc

Re: flink sql count distonct 优化

2021-03-30 Thread Robin Zhang
Hi,guomuhua `The number of inputs accumulated by local aggregation every time is based on mini-batch interval. It means local-global aggregation depends on mini-batch optimization is enabled ` ,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.

Re: flink sql count distonct 优化

2021-03-30 Thread Robin Zhang
是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window >> > agg支持这个参数了。可以期待下。 >> > >> > Best, >> > Jark >> > >> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang >> >> > vincent2015qdlg@ >> >> > >> >

Re: flink sql count distonct 优化

2021-03-24 Thread Robin Zhang
Hi,guomuhua 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 Best, Robin guomuhua wrote > 在SQL中,如果开启了 local-global 参数:set > table.optimizer.agg-phase-strategy=TWO_PHASE; > 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; > set >

Re: Flink 消费kafka ,写ORC文件

2021-03-23 Thread Robin Zhang
Hi,Jacob 官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner ` 链接: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D

Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 Thread Robin Zhang
Hi,凌战 看看hadoop环境变量是否正确设置,可以参考文档 https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation Best, Robin 凌战 wrote > hi,社区 > 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如 > -rw-r--r-- 3 hdfs supergroup

Re: flink1.11的Streaming File Sink问题

2021-02-23 Thread Robin Zhang
Hi, op flink内部可以实现exactly-once语义,但是写到hdfs是至少一次的语义,如果任务失败重新启动会发生数据重复的问题,所以需要自己增加逻辑处理。 Best, Robin op wrote > 大家好: > 我想知道flink1.11的Streaming File > Sink保存流数据到hdfs支持exactly-once语义吗,官网好像没说,谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink实时统计 结果波动时大时小

2021-02-17 Thread Robin Zhang
Hi,flink2021 首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题 Best, Robin flink2021 wrote > 我的数据源是kafka > 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state > 使用rockdb报错,没有设置过期时间) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from:

Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 Thread Robin Zhang
Hi,张云云 1. flink.partition-discovery.interval-millis 是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的 2. 通过shell查看topic分区是否顺利增加,并且有数据写入。 Best, Robin 张云云 wrote > When start the job, occurs WARN log like below: > > WARN org.apache.kafka.clients.consumer.ConsumerConfig - The > configuration

Re: Flink 提交作业时的缓存可以删除吗

2021-02-02 Thread Robin Zhang
这个逻辑 > > 可以加一下日志看看实际是否触发,删除的是什么目录 > > Best, > tison. > > > Robin Zhang > vincent2015qdlg@ > 于2021年2月2日周二 下午2:37写道: > >> Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图: >> >> < >> http://apache-flink.147419.n8.nabble.com/file/t447/flink

Flink 提交作业时的缓存可以删除吗

2021-02-01 Thread Robin Zhang
Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图: 由于flink很早就开始使用了,这种目录越来越多,就算任务不在运行也不会自动清除。经过简单测试,直接删除后,不影响任务的运行以及简单的状态恢复。目前不知道会不会存在其他依赖,希望有清楚的能解释下这个的原理、作用以及能否删除。

FlinkSQL 1.10.0 where条件包含关键字列名的过滤条件不能使用=判断

2021-01-04 Thread Robin Zhang
测试代码如下: create view sink_test as select id ,type ,student_id ,kefu_id ,action_time ,action_user ,distribute_status ,unbind_type ,`comment` ,time_created ,pull_from from distribute_new_log where `comment` ='娃娃鱼'; print table sink_test;

Re: FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 Thread Robin Zhang
Hi,zilong 确实是bug,跟我的使用方式一样。感谢! zilong xiao wrote > 没记错这是一个bug,计算列中含有关键字会异常,可以看下这个issue: > https://issues.apache.org/jira/browse/FLINK-16068 > > Robin Zhang > vincent2015qdlg@ > 于2020年12月29日周二 下午6:56写道: > >> -- 定义时间非系统保留字为事件时间字段,能正常运行 >> create t

FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 Thread Robin Zhang
-- 定义时间非系统保留字为事件时间字段,能正常运行 create table events ( process_time bigint comment '事件时间', event string comment '事件类型', ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) with ( ... ... );

Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-23 Thread Robin Zhang
Hi,yujianbo 只要任务结束,不管是cancel、failed、killed都会在history sever展示, 可以先去hdfs查看配置的目录是否存在任务相关的文件夹;也可以尝试重启一下history server试试。麻烦问一下,你的任务使用什么api写的,以及版本、提交方式? yujianbo wrote > 大佬,我发现我配置完后就只能看到完成的任务在history

Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-23 Thread Robin Zhang
的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业 > > PS:另外提几个 history server 的问题 > > 1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡 > > 2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS > 其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志 > > > Best! > zhisheng >

Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 Thread Robin Zhang
如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per job方式提交到yarn运行的,只是多了个sql解析动作。不能理解 ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-20 Thread Robin Zhang
Hi,奔跑的小飞袁 目前没试过flink集成es,所以细节方面没办法深究太多,但是,可以给你提供个思路: 1. 查看pom中es的dependency是否设置了scope,导致依赖没有成功引入; 2. 如果依赖成功引入了,但是还不行,相反,在lib下放置相同的jar却可以正常执行,基本可以确定就是依赖冲突,具体什么类导致的,这个目前无法确定,期待更好地思路。 Best, Robin 奔跑的小飞袁 wrote > 现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了

Re: 关于flink-sql count/sum 数据如何每天重新计算

2020-10-20 Thread Robin Zhang
Hi, 夜思流年梦 我理解按照日期分组就可以解决你的需求,流数据属于哪一天就只算当天的,不影响其他date的数据; 按天分组的数据都计算出来了,再汇总一下就是一个月的 Best, Robin 夜思流年梦 wrote > 现有此场景: > 计算每天员工的业绩(只计算当天的) > > > 现在我用flink-sql 的方式,insert into select current_date, count(1) ,worker from > XX where writeTime>=current_date group by worker; >

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-20 Thread Robin Zhang
Hi, 奔跑的小飞袁 Flink的class加载原则是child first,所以,尽量避免在pom中自己引入flink相关的依赖,避免跟Flink集群环境造成冲突,建议将安装包放在lib下,由flink去加载。 Best, Robin 奔跑的小飞袁 wrote > hello >

Re: 单任务多条流的逻辑报错

2020-10-20 Thread Robin Zhang
Hi, 根据报错内容,定位到你的代码在 at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at

Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
Hi,zilong 的确是这个问题,感谢帮助。 Best, Robin zilong xiao wrote > Hi Robin Zhang > 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626 > ,可以看下这个issue描述,祝好~ > > Robin Zhang > vincent2015qdlg@ > 于2020年10月19日周一 下午3:42写道: > >> 普通的source -> ma

Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
Hi,Congxian 感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志 Best, Robin Congxian Qiu wrote > Hi > 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task > 完成的慢,(savepoint 可能比 checkpoint 要慢) > Best, > Congxian > > > Robin Zhang > vincent2015qdlg@ > 于20

flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
普通的source -> map -> filter-> sink 测试应用。 触发savepoint的脚本 : ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} 具体报错信息: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "81990282a4686ebda3d04041e3620776". at

Re: 回复: flink sql count问题

2020-09-29 Thread Robin Zhang
Hi lemon, 不是很理解你的疑问是什么,flink是事件驱动的,所以,来一条数据,就会被处理,走你的逻辑,就会产生一个结果,如果是第一次出现的key,只有一条数据,如果是状态中已经存在的key,会在控制台输出两条数据,一条true的是最终sink的结果。所以,每次输出一条结果有什么问题吗? Best, Robin lemon wrote >

Re: flink sql count问题

2020-09-29 Thread Robin Zhang
Hi lemon, 内部判断if函数可以替换为case when Best, Robin lemon wrote > 请教各位: > 我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录, > 之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink > sql中count不能为null,有什么别的方法能实现该功能吗? > 使用的是flink1.10.1 blink > -- Sent from:

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 Thread Robin Zhang
s.apache.org/jira/browse/FLINK-19449 > > Robin Zhang > vincent2015qdlg@ > 于2020年9月29日周二 下午2:04写道: > >> 环境: flink 1.10,使用flinkSQL >> >> kafka输入数据如: >> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} >

Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 Thread Robin Zhang
Hi jester_jim, 配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job id.所以,并不是新建父目录就可以,依然会存在权限问题 。 祝好,Robin Zhang Flink中文社区的各位大佬你们好: 本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2 jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解

如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 Thread Robin Zhang
环境: flink 1.10,使用flinkSQL kafka输入数据如: {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-09-25 Thread Robin Zhang
ocs-stable/dev/stream/state/schema_evolution.html祝好唐云________From: > Robin Zhang > vincent2015qdlg@ > Sent: Wednesday, July 15, 2020 16:23To: > user-zh@.apache > > user-zh@.apache > Subject: Re: flink 1.9.2 升级 1.10.0 > 任务失败不能从checkpoint恢复

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 Thread Robin Zhang
据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑 Best Robin Zhang From: Peihui He <[hidden email]> Sent: Tuesday, July 14, 2020 10:42 To: [hidden email] <[hidden email]> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 hello, 当升级到1.10.0 时候,程序出错后会尝试从che

Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 Thread Robin Zhang
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by 和多次 流表的关联 。 代码如下: tEnv.getConfig() .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 Thread Robin Zhang
没有窗口,就简单的表join,有kafka流表 ,kudu维表,使用了group by > Jul 14, 2020; 12:36pm — by zhisheng zhisheng > 有没有窗口啊? Robin Zhang <[hidden email]> 于2020年7月14日周二 上午11:48写道: > <http://apache-flink.147419.n8.nabble.com/file/t447/ttl.png> > 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 grou

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 Thread Robin Zhang
没有使用窗口呢,就多表关联,涉及到流表join流表,流表join维表,group by 、topN等 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 Thread Robin Zhang
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by 和多次 流表的关联 。 代码如下: tEnv.getConfig() .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),