SQL null 过滤的问题

2021-08-11 文章 silence
flink 版本:1.12 列:col varchar 使用where col is null时可以过滤出col为null的记录 使用where col is null or col = ''时就不可以 同时试了下另外一种写法 where (case when col is null then true else false end) 可以过滤出来 where (case when col is null then true when col = '' then true else false end) 过滤不出来 请问这个bug吗,还是语法有问题

回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 silence
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726 不行的话可以在ddl中限制列的数量 -- 发件人:Ye Chen 发送时间:2021年8月2日(星期一) 11:37 收件人:user-zh ; silence 主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 你好,我试了一下,如

回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 silence
如果只想更新部分字段的话可以试下 insert into t(a,b) select a,b from x -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 现有table CREATE TABLE t ( abigint, b

回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 silence
你在你的sink ddl定义了主键会自动的按主键进行upsert的 参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT

回复:回复:flink sql 依赖隔离

2021-07-25 文章 silence
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊 -- 发件人:Michael Ran 发送时间:2021年7月23日(星期五) 17:42 收件人:user-zh ; silence 主 题:Re:回复:flink sql 依赖隔离 建议上传的时候单独放,提交任务的时候 拉下来单独引用 在 2021-07-23 11:01:59,"silence" 写道: > >这边目前主要还是yarn,

回复:flink sql 依赖隔离

2021-07-22 文章 silence
这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载 udf和sql jar之间、udf和udf之间都可能会有依赖冲突, 目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突 -- 发件人:Michael Ran 发送时间:2021年7月22日(星期四) 20:07 收件人:user-zh ; silence

回复:flinksql问题请教

2021-07-06 文章 silence
已解决 where 条件始终为假。 -- 发件人:silence 发送时间:2021年7月7日(星期三) 12:05 收件人:user-zh 主 题:flinksql问题请教 请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join flink版本:1.12.2 Stage 1 : Data Source content : Source: Values

flinksql问题请教

2021-07-06 文章 silence
请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join flink版本:1.12.2 Stage 1 : Data Source content : Source: Values(tuples=[[]]) Stage 2 : Operator content : Correlate(invocation=[LateralArray($cor3.gift_list)], correlate=[table(LateralArray($cor3.gift_list))],

回复:flink sql 依赖隔离

2021-07-05 文章 silence
没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突 -- 发件人:yzhhui 发送时间:2021年7月5日(星期一) 14:09 收件人:user-zh@flink.apache.org ; silence 抄 送:user-zh 主 题:回复:flink sql 依赖隔离 提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK 在2021年

flink sql 依赖隔离

2021-07-05 文章 silence
请教大家目前flink sql有没有办法做到依赖隔离 比如connector,format,udf(这个最重要)等, 很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划

回复:普通表join版本表,怎么得到append表

2021-06-30 文章 silence
目前interval join和维表的时态join不会进行回撤,其他场景会产生回撤数据 -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 17:47 收件人:user-zh@flink.apache.org 主 题:普通表join版本表,怎么得到append表 大佬们,请教个问题, insert into sink_2 select a.`time`,c.cust,b.mobile from case2_TOPIC_A a left

回复:flink sql 空闲数据源场景如何配置

2021-06-30 文章 silence
可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过

Serializer consumed more bytes than the record had

2021-06-11 文章 silence
flink 版本1.12 异常如下: java.io.IOException: Can't get next record for channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) at

Re: 如何统计n小时内,flink成功从kafka消费的数据量?

2021-05-18 文章 silence
可以在metrics 上报时或落地前对source两次上报间隔的numRecordsOut值进行相减,最后呈现的时候按时间段累计就可以了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Table-api sql 预检查

2021-04-29 文章 silence
可以用explain -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 silence
启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 提交两个SQL任务,其中一个不生效。

2021-03-10 文章 silence
多个insert的话要用statementset去提交 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-09 文章 silence
个人也维护了个flink平台的开源项目,希望可以帮助到你 https://github.com/hairless/plink -- Sent from: http://apache-flink.147419.n8.nabble.com/

[sql]TimeStamp和异常格式的字符串进行比较时会报空指针

2021-03-05 文章 silence
问题描述: TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针 像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因 是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标 例:where CURRENT_TIMESTAMP='' where CURRENT_TIMESTAMP='19700101' java.lang.NullPointerException: null at

Re: 通过普通ddl来读写hive

2021-02-23 文章 silence
那用自定义的catalog怎么定义hive表来读写hive呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 通过普通ddl来读写hive

2021-02-23 文章 silence
我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。 我个人觉得理想的方式是单个flink sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。 总结一下就是不希望引入HiveCatalog来进行hive表的读写 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 通过普通ddl来读写hive

2021-02-23 文章 silence
你好 感谢回复 主要有以下几点原因: 1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改 2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter hive的能力 3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言 4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可 -- Sent from:

通过普通ddl来读写hive

2021-02-22 文章 silence
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗 现在不支持是有什么考虑吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.12 不能同时在一个工程消费jdbc和kafka CDC数据

2021-02-18 文章 silence
可以尝试在shade插件里加个transformer -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-1.12 通过-t指定模式后无法指定yarn参数

2021-01-27 文章 silence
flink1.12后所有的yarn相关的参数通过-D进行指定 例:-D yarn.application.name=xxx 替代以前的-ynm xxx 更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教关于Flink yarnship的使用

2021-01-22 文章 silence
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试 然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg") -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 数组下标问题

2020-12-31 文章 silence
flink sql官方文档中数组的取值方式如下定义 array ‘[’ integer ‘]’ Returns the element at position integer in array. The index starts from 1. 参考链接 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#collection-functions 主要问题就是数组的下标是从1开始的,这不符合数组从0开始的常识,也和hive

Re: yarn application模式提交任务失败

2020-12-20 文章 silence
应该是-D不是-yD -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-13 文章 silence
flink已经不建议将hadoop的jar放到lib里了 可以通过 export HADOOP_CLASSPATH=`hadoop classpath` 加载hadoop的依赖 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink使用多个keytab

2020-12-06 文章 silence
这个问题我们也遇到过,目前这个issue在跟进,https://issues.apache.org/jira/browse/FLINK-12130 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-06 文章 silence
可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 silence
你好: 这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定 一方面可以做到灵活的依赖控制,减少main jar的大小 另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性 ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 silence
看了很多同学回复yarn的解决方案 我这再补充一下: 还是希望可以提供更通用的submit参数来解决此问题, 包括提交到standalone集群时可以额外指定本地依赖jar 有没有cli相关的同学可以跟进下建议 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 文章 silence
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说 -- Sent from: http://apache-flink.147419.n8.nabble.com/

请教一下目前flink submit能不能指定额外的依赖jar

2020-11-05 文章 silence
大家好 由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的, 因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等, 由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突) 因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars 没有的话有没有相关的issue可以跟进这个问题 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink任务挂掉后自动重启

2020-11-01 文章 silence
说一下我们平台的实现方式 1、自定义metricReporter,假如任务开启了checkpoint,reporter会自动的将最新完成的checkpoint路径进行上报 可参考https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing 2、平台会有是否重试和是否基于checkpoint进行恢复的选项 3、假如上述两选项都开启了之后,可以对运行失败的任务基于最新的checkpoint进行拉起 -- Sent from:

Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-28 文章 silence
hi zhisheng 我找到两篇相关的参考博客你看一下 https://blog.csdn.net/a1240466196/article/details/107853926 https://www.jianshu.com/p/c7515bdde1f7 -- Sent from: http://apache-flink.147419.n8.nabble.com/

官方后续会有支持kafka lag metric的计划吗

2020-10-28 文章 silence
目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况 主要是两种情况: 1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量 2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时 kafka lag的监控对实时任务的稳定运行有着非常重要的作用, 网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下 -- Sent from:

Re: Flink 1.11里如何parse出未解析的执行计划

2020-10-21 文章 silence
; import org.apache.flink.sql.parser.validate.FlinkSqlConformance; /** * @author: silence * @date: 2020/10/22 */ public class Test { public static void main(String[] args) throws SqlParseException { String sql = "xxx"; SqlParser.Config sqlParserConfig =

Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 文章 silence
也可以通过普通的非窗口聚合进行实现吧,minibatch设大点 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [SQL] parse table name from sql statement

2020-09-21 文章 silence
我写过一个类似的可以参考一下 private static List lookupSelectTable(SqlNode sqlNode) { List list = new ArrayList<>(); if (sqlNode instanceof SqlSelect) { SqlNode from = ((SqlSelect) sqlNode).getFrom(); list.addAll(lookupSelectTable(from)); } else if (sqlNode

Re: [SQL] parse table name from sql statement

2020-09-21 文章 silence
写过一个类似的可以参考一下 private static List lookupSelectTable(SqlNode sqlNode) { List list = new ArrayList<>(); if (sqlNode instanceof SqlSelect) { SqlNode from = ((SqlSelect) sqlNode).getFrom(); list.addAll(lookupSelectTable(from)); } else if (sqlNode

Re: flink-CDC client 一对多问题

2020-09-21 文章 silence
可以写一个group_array的udaf select * from aa as a left join ( select userId,group_array(row(userId, userBankNo, userBankNo)) from bb group by userId ) as b where a.userId=b.userId -- Sent from: http://apache-flink.147419.n8.nabble.com/

请教大家如何注册支持多返回值类型的UDAF

2020-09-16 文章 silence
如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问: 1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现? ​2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型

Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-11 文章 silence
没有insert语句也就是没有sink无法触发计算 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 silence
个人理解有几种实现方案 1、通过主键加LAST_VALUE()使用最新的记录进行计算 2、通过flink-cdc connector source 3、自己根据操作类型写计算逻辑 -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink-sql 1.11版本都还没完全支持checkpoint吗

2020-09-08 文章 silence
手动停止再恢复的话需要启动时通过 (-s 上一次checkpoint的mate路径)进行恢复 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/check points.html#resuming-from-a-retained-checkpoint -邮件原件- 发件人: 凌天荣 <466792...@qq.com> 发送时间: 2020年9月8日 15:50 收件人: user-zh 主题: flink-sql 1.11版本都还没完全支持checkpoint吗