Re: computed column转为timestamp类型后进行窗口聚合报错
有木有尝试补充 watermark 语法 jun su 于2020年12月11日周五 上午10:47写道: > hi all, > > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误: > > ddl: > > CREATE TABLE source( > occur_time BIGINT, > rowtime AS longToTimestamp(occur_time) > ) WITH ('connector' = 'filesystem','format' = 'orc','path' = > '/path/to/data') > > 报错信息: > > Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input > fields are: [occur_time] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385) > at > > org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720) > at > > org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161) > at > > org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111) > at > > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217) > ... 27 more > > -- > Best, > Jun Su >
Re: flink 1.12如何使用RateLimiter
您好 请问是什么场景呢 ?限速的目的是什么 ? 18757141558 <18757141...@163.com> 于2020年12月9日周三 下午6:49写道: > 在源码中找到 FlinkConnectorRateLimiter 和 GuavaFlinkConnectorRateLimiter > kafka相关的类中没有找到这些配置 > 请问如何在api中使用RateLimiter(不修改源码方式)
Re: Flink sql 无法用!=
是的 <> 是 SQL 标准推荐的用法。 jindy_liu <286729...@qq.com> 于2020年11月16日周一 下午2:24写道: > 用<> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
DataSet 已经是社区准备 deprecate 的 API 了,不建议再使用。1.12 版本后推荐统一使用 DataStream,使用 sqlQuery 接口拿到 table 对象后转成 DataStream。 Asahi Lee <978466...@qq.com> 于2020年11月13日周五 下午4:05写道: > BatchTableEnvironment对象可以进行table to dataset; dataset to table > > > > > --原始邮件-- > 发件人: > "user-zh" > < > danny0...@apache.org; > 发送时间:2020年11月10日(星期二) 下午2:43 > 收件人:"user-zh" > 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象 > > > > 拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。 > > Asahi Lee <978466...@qq.com 于2020年11月9日周一 下午5:09写道: > > 是的,BatchTableEnvironment 对象 > > > > > --nbsp;原始邮件nbsp;-- > 发件人: > > "user-zh" > > < > danny0...@apache.orggt;; > 发送时间:nbsp;2020年11月9日(星期一) 中午12:34 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象 > > > > gt; > gt; BatchTableEnvironment 环境 > > > 是说nbsp; BatchTableEnvironment 对象吗 > > Asahi Lee <978466...@qq.comgt; 于2020年11月9日周一 上午10:48写道: > > gt; 你好! > gt; amp;nbsp; amp;nbsp; amp;nbsp; 我使用的是flink > 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取: > gt; // ** // BLINK BATCH QUERY // > ** > import > gt; org.apache.flink.table.api.EnvironmentSettings; import > gt; org.apache.flink.table.api.TableEnvironment; > EnvironmentSettings > bbSettings > gt; = > gt; > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > gt; TableEnvironment bbTableEnv = > gt; > > TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?
Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态 Lei Wang 于2020年11月12日周四 下午9:17写道: > 用 session windown 确实能满足功能: > > > robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, > y) -> y); > > 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 > > > 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 > > > > On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote: > > > > > > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > > > > > > 在 2020-11-12 14:34:59,"Danny Chan" 写道: > > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > > > >Lei Wang 于2020年11月11日周三 下午2:03写道: > > > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > > >> > > >> 比如 > > >> robot1 2020-11-11 12:00:00 msginfo > > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > > 就发出报警呢? > > >> > > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > > >> > > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > > >> > > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > > >> 我们必须 按 robotId 做 keyBy > > >> > > >> 求大神指教。 > > >> > > >> 谢谢, > > >> 王磊 > > >> > > >
Re: Flink sql查询NULL值错误
是的 Flink SQL 现在还不支持隐式类型,需要手动设置 NULL 的类型 SQL 才能通过编译。 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午8:52写道: > 感谢大佬!!! > > > 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道: > > > > Hi, > > > > > > 需要将 null cast 成某个具体的值,比如: > > if(type=1,2,cast(null as int)) > > > > > > Best, > > Hailong > > 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道: > >> Select > >> id, > >> name, > >> if(type=1,2,null) > >> From > >> user ; > >> 当我执行上面的sql的时候提示我 > >> [ERROR] Could not execute SQL statement. Reason: > >> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of > ‘NULL’ > >> 是无法将null展示吗? > > > > > > > > >
Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 Lei Wang 于2020年11月11日周三 下午2:03写道: > 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > > 比如 > robot1 2020-11-11 12:00:00 msginfo > 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? > > flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > > 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > > 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > 我们必须 按 robotId 做 keyBy > > 求大神指教。 > > 谢谢, > 王磊 >
Re: Flink SQL传递性
创建 view ? amen...@163.com 于2020年11月10日周二 下午3:28写道: > hi everyone, > > Flink SQL有没有上一个SQL的输出是下一个SQL的输入的业务场景思路? > 比如说KafkaSource -> SQL_1 -> SQL_2 -> MysqlSink,一整个链起来,作为一个任务提交运行~ > > best, > amenhub >
Re: Flink sql tinyint类型使用in 报错
好的 了解 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午3:19写道: > 就比如我要用flink cdc 接入mysql表,一般都是直接将mysql表的DDL稍加修改然后在flink sql中创建,一般都不会考虑到 > 类型转换的问题就直接沿用mysql中的类型,当然sql也是一样的。同样的sql在满足语法正确性的情况下,mysql中能跑,而flinksql中无法跑, > 当然可以通过显示类型转化来完成,但是能提供自动转化会更好的提供易用性。 > > > 在 2020年11月10日,下午2:51,Danny Chan 写道: > > > > 暂时还没有 你们是什么场景需要用到隐式类型 > > > > 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道: > > > >> 请问有没有计划加入隐式类型自动转化呢 > >> > >>> 在 2020年11月10日,下午2:35,Jark Wu 写道: > >>> > >>> 是的。Flink 目前还不支持隐式类型转换。 > >>> > >>> Best, > >>> Jark > >>> > >>> On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote: > >>> > >>>> Hi, > >>>> > >>>> > >>>> 从你的报错来看,是 in 不支持隐式 CAST。 > >>>> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成 TINYINT。 > >>>> > >>>> > >>>> Best, > >>>> Hailong Wang > >>>> > >>>> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道: > >>>>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type > >>>> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 > flink > >>>> sql不会自动转换类型吗? > >>>>> > >>>>> [ERROR] Could not execute SQL statement. Reason: > >>>>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No > >>>> applicable constructor/method found for actual parameters "int"; > >> candidates > >>>> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet() > >>>> > >> > >> > >> > > >
Re: SQL从1.9迁移到1.11的问题
通过 Table 操作流程的 DAG 现在不再会缓存到底层的 exec env 中,为了避免 transformations 污染,所以是拿不到的,但是内部代码我们仍然是先拼接 StreamGraph 然后直接通过 exec env 提交。 izual 于2020年10月30日周五 下午5:04写道: > hi,Community: > > > 我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口: > 1. sqlQuery sqlUpdate: 执行表的创建、查找和写入 > 2. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new > RichSinkFunction )写入 > 3. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表 > > > 最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者 > sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。 > > > 看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教: > > > 1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute() > 来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate > 更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke > 的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv > 的思路,这种情况怎么实现更加合理? > 2. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table > 操作流程的 DAG,应该通过 tableEnv 的哪个接口获取? > > > 1. > https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#corrected-execution-behavior-of-tableenvironmentexecute-and-streamtableenvironmentexecute-flink-16363 > 2. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > 3. > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query > >
Re: Flink sql tinyint类型使用in 报错
暂时还没有 你们是什么场景需要用到隐式类型 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道: > 请问有没有计划加入隐式类型自动转化呢 > > > 在 2020年11月10日,下午2:35,Jark Wu 写道: > > > > 是的。Flink 目前还不支持隐式类型转换。 > > > > Best, > > Jark > > > > On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote: > > > >> Hi, > >> > >> > >> 从你的报错来看,是 in 不支持隐式 CAST。 > >> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成 TINYINT。 > >> > >> > >> Best, > >> Hailong Wang > >> > >> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道: > >>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type > >> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink > >> sql不会自动转换类型吗? > >>> > >>> [ERROR] Could not execute SQL statement. Reason: > >>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No > >> applicable constructor/method found for actual parameters "int"; > candidates > >> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet() > >> > > >
Re: flink sql LATERAL TABLE
可以提供详细信息,比如 sql 语句是啥 报错堆栈给出来 赵帅 于2020年11月10日周二 下午2:48写道: > 请教一下,flink sql,lateral table如何配合insert overwrite使用,直接select不报错,但是一旦insert > overwrite就报错
Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。 Asahi Lee <978466...@qq.com> 于2020年11月9日周一 下午5:09写道: > 是的,BatchTableEnvironment 对象 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > danny0...@apache.org; > 发送时间:2020年11月9日(星期一) 中午12:34 > 收件人:"user-zh" > 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象 > > > > > BatchTableEnvironment 环境 > > > 是说 BatchTableEnvironment 对象吗 > > Asahi Lee <978466...@qq.com 于2020年11月9日周一 上午10:48写道: > > 你好! > nbsp; nbsp; nbsp; 我使用的是flink > 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取: > // ** // BLINK BATCH QUERY // ** > import > org.apache.flink.table.api.EnvironmentSettings; import > org.apache.flink.table.api.TableEnvironment; EnvironmentSettings > bbSettings > = > > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = > > TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?
Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
> > BatchTableEnvironment 环境 是说 BatchTableEnvironment 对象吗 Asahi Lee <978466...@qq.com> 于2020年11月9日周一 上午10:48写道: > 你好! >我使用的是flink 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取: > // ** // BLINK BATCH QUERY // ** import > org.apache.flink.table.api.EnvironmentSettings; import > org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings > = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment bbTableEnv = > TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?
Re: flink sql kafka connector with avro confluent schema registry support
支持的,参考 code https://github.com/apache/flink/pull/12919/commits 陈帅 于2020年11月3日周二 上午8:44写道: > flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗? > 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢! >
Re: flinksql 不支持 % 运算
%是非标准的 SQL 语法,不推荐使用。 Benchao Li 于2020年10月26日周一 下午9:26写道: > 1.11的话通过配置是无法实现的。可以把这个pr[1] cherry-pick到1.11的分支上编译一下来实现1.11上使用% > > [1] https://github.com/apache/flink/pull/12818 > > 夜思流年梦 于2020年10月26日周一 下午4:16写道: > > > flink 版本1.11 > > 目前flink-sql 好像不支持取余运算,会报错: > > 比如:SELECT * FROM Orders WHERE a % 2 = 0 > > Percent remainder '%' is not allowed under the current SQL conformance > > level > > > > > > 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复 > > > > > > > > > > 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml > > > > -- > > Best, > Benchao Li >
Re: 请问批处理有反压嘛?
有的,反压机制借助于 runtime 的网络 buffer,和批流无关。 请叫我雷锋 <854194...@qq.com> 于2020年10月27日周二 下午8:02写道: > 如题
Re: 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗
你可以了解下 Calcite 的 metadata 系统,其中有一个 metadata: RelMdColumnOrigins 可以拿到 column 的血缘,前提是你要拿到 SQL 对的关系表达式树。 Best, Danny Chan 在 2020年10月20日 +0800 PM8:43,dawangli ,写道: > 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗
Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM
应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule 被频繁触发了,之前修过一个类似的问题[1],可以参考下 [1] https://issues.apache.org/jira/browse/CALCITE-3121 Best, Danny Chan 在 2020年9月23日 +0800 AM10:23,jun su ,写道: > hi godfrey, > 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的 > > godfrey he 于2020年9月23日周三 上午10:09写道: > > > Hi Jun, > > > > 可能是old planner缺少一些rule导致遇到了corner case, > > blink planner之前解过一些类似的案例。 > > > > jun su 于2020年9月23日周三 上午9:53写道: > > > > > hi godfrey, > > > > > > 刚看了下, blink应该也会用hep , 上文说错了 > > > > > > jun su 于2020年9月23日周三 上午9:19写道: > > > > > > > hi godfrey, > > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep, > > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因 > > > > > > > > godfrey he 于2020年9月22日周二 下午8:58写道: > > > > > > > > > blink planner 有这个问题吗? > > > > > > > > > > jun su 于2020年9月22日周二 下午3:27写道: > > > > > > > > > > > hi all, > > > > > > > > > > > > 环境: flink-1.9.2 flink table planner > > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM > > > > > > > > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, > > > 导致进程OOM > > > > > > --- > > > > > > 代码: > > > > > > > > > > > > fbTableEnv.registerTableSource("source",orcTableSource) > > > > > > > > > > > > val select = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source > > ") > > > > > > > > > > > > fbTableEnv.registerTable("selectTable",select) > > > > > > > > > > > > val t1 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from > > > selectTable > > > > > > where Auth_Roles like 'a%'") > > > > > > fbTableEnv.registerTable("t1",t1) > > > > > > > > > > > > val t2 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where > > > > > > Target_UserSid= 'b'") > > > > > > fbTableEnv.registerTable("t2",t2) > > > > > > > > > > > > val t3 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where > > > > > > Thread_ID= 'c'") > > > > > > fbTableEnv.registerTable("t3",t3) > > > > > > > > > > > > val t4 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where > > > > > > access_path= 'd'") > > > > > > fbTableEnv.registerTable("t4",t4) > > > > > > > > > > > > val t5 = fbTableEnv.sqlQuery("select > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where > > > > > > action= 'e'") > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > > > > Jun Su > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > > > Jun Su > > > > > > > > > > > > > -- > > > Best, > > > Jun Su > > > > > > > > -- > Best, > Jun Su
Re: Row和RowData的区别
Row 是暴露给 DataStream 用户用的,里面可以设置 RowKind,RowData 是 Table 内部的数据结构,在一些场景序列化会有提升,使用 Flink SQL 会直接应用上 RowData,当然高级用户想直接用 RowData 也是可以的,1.11 的新版 connector API 就是将 RowData 暴露给了 connector 开发者。 Best, Danny Chan > 在 2020年9月9日,下午1:51,刘首维 写道: > > Hi all, > > > 请问`org.apache.flink.types.Row`和`org.apache.flink.table.data.RowData`的区别和联系是?
Re: 消费kafka数据乱序问题
你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 shuffle 就会破坏顺序,目前想保序,一种办法是 source 的并发和下游保持一致。 Best, Danny Chan 在 2020年9月4日 +0800 PM4:40,smq <374060...@qq.com>,写道: > 大家好 > > 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. > 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。 > 这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.
Re: flink sql多层view嵌套,字段not found
这是一个已知问题,社区版本已经修复了 [1],不过还有一个后续 PR https://github.com/apache/flink/pull/13293,待 merge [1] https://issues.apache.org/jira/browse/FLINK-18750 Best, Danny Chan 在 2020年9月3日 +0800 PM6:41,Lin Hou ,写道: > Hi, > > 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。 > 元数据已经建好,简述如下: > > 1.建嵌套的view: > > create temporary view temp_app_impression_5min as > select > argService as arg_service, > timeLocal as time_local, > mid as mid, > vipruid as vipruid, > activity as activity, > LOWER(activityProperty) as activity_property > from > vipdwd.clean; > > create temporary view v1_1 as > select > get_json_object(activity_property, '$.page') as impression_page, > SPLIT_INDEX(good, '_', 0) as sales_no, > good, > imp.* > from > temp_app_impression_5min as imp, lateral table ( > string_split( > COALESCE( > get_json_object(activity_property, '$.goodslist'), > '' > ), > ',' > ) > ) as T(good) > where > activity = 'active'; > > 2. 描述view: > > 两个view建好没有问题, > 同时 desc v1_1 如下: > > Flink SQL> desc v1_1;root > |-- impression_page: STRING > |-- sales_no: VARCHAR(2000) > |-- good: STRING > |-- arg_service: STRING > |-- time_local: BIGINT > |-- mid: STRING > |-- vipruid: BIGINT > |-- activity: STRING > |-- activity_property: STRING > > 可见字段activity_property存在于v1_1中, > > 3.执行: > > 但是当执行: > select > * > from > v1_1; > > 出错:[ERROR] Could not execute SQL statement. Reason: > > org.apache.calcite.sql.validate.SqlValidatorException: Column > 'activity_property' not found in any table > > 一层view的时候查询没有问题,当嵌套view的时候就会报这个错,哪怕生成这个view的时候指定这个字段都不行: > > create temporary view v1_1 as > select > get_json_object(activity_property, '$.page') as impression_page, > SPLIT_INDEX(good, '_', 0) as sales_no, > good, > imp.*, > > activity_property > > from > temp_app_impression_5min as imp, lateral table ( > string_split( > COALESCE( > get_json_object(activity_property, '$.goodslist'), > '' > ), > ',' > ) > ) as T(good) > where > activity = 'active'; > > > 请问有解决办法么,或者是我们用的方式不对。 > > > 完整调用栈日志: > > org.apache.flink.table.client.gateway.SqlExecutionException: > Invalidate SQL statement. > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90) > ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > Caused by: org.apache.flink.table.api.ValidationException: SQL > validation failed. From line 3, column 84 to line 3, column 102: > Column 'activity_property' not found in any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) > ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.1
Re: Flink SQL 任务乱码问题
SQL 文本是什么编码 ?有尝试过 UTF8 编码 ? Best, Danny Chan 在 2020年9月3日 +0800 PM5:04,LakeShen ,写道: > Hi 社区, > > 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下: > > select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va > from xxx ; > > 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。 > > 目前有什么比较好的解决方法吗。 > > Best, > LakeShen
Re: flink-1.11 sql写ES6问题
Es connector 的包放到哪个目录下了 ? Best, Danny Chan 在 2020年9月2日 +0800 PM3:38,酷酷的浑蛋 ,写道: > Caused by: java.lang.ClassNotFoundException: > org.elasticsearch.client.RestClientBuilder > Flink sql 写入ES:总是报上面的错误,我检查了依赖并没有冲突这个类啊,而且我解压了jar,里面是有这个类的啊 > > >
Re: 【闫云鹏】Flink sql 写入es实现object嵌套形式
[ { f:f },{ g:g } ] 可否用 Array> 来表达? Best, Danny Chan 在 2020年9月2日 +0800 PM3:54,user-zh@flink.apache.org,写道: > > [ > { > f:f > },{ > g:g > } > ]
Re: flink-1.11连接hive或filesystem问题
1. 为了保证数据正确性 stream 写文件依赖了 checkpoint 机制,你可以将你的间隔时间和 checkpoint 时间保持一致 2. 按逗号分隔是说 CSV format ? Best, Danny Chan 在 2020年8月31日 +0800 PM8:53,酷酷的浑蛋 ,写道: > 1. Create hive表(...)with(...) > 我发现写入hive只能根据checkpoint去提交分区?可以按照文件大小或者间隔时间来生成吗? > > > 2. Create table (connector=filesystem,format=json) with(…) > 这种方式format只能等于json? 我怎么按照分隔符写入hdfs?
Re: flink1.11连接mysql问题
这个问题已经有 issue 在追踪了 [1] [1] https://issues.apache.org/jira/browse/FLINK-12494 Best, Danny Chan 在 2020年8月28日 +0800 PM3:02,user-zh@flink.apache.org,写道: > > CommunicationsException
Re: flink1.11时间函数
对应英文的 deterministic function 可以更好理解些 ~ Best, Danny Chan 在 2020年8月29日 +0800 PM6:23,Dream-底限 ,写道: > 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的 > > Benchao Li 于2020年8月28日周五 下午8:01写道: > > > 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。 > > 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。 > > 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。 > > > > Dream-底限 于2020年8月28日周五 下午2:50写道: > > > > > hi > > > > > > UNIX_TIMESTAMP() > > > > > > NOW() > > > > > > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗 > > > > > > > > > -- > > > > Best, > > Benchao Li > >
Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
能否提供下完整的 query,方便追踪和排查 ~ Best, Danny Chan 在 2020年8月31日 +0800 AM10:58,zhuyuping <1050316...@qq.com>,写道: > 同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长 > 好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g > 不断的无限增长下去 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 有提供参数支持大小写不敏感吗?
Hi ~ 要开启大小写不敏感涉及的东西比较多,例如词法解析,catalog 以及部分访问表达式 (a.b.c 或者 a[‘f0’]),社区已经有 issue 跟进了 [1],预期在 1.12 版本可以解决。 [1] https://issues.apache.org/jira/browse/FLINK-16175 Best, Danny Chan 在 2020年8月27日 +0800 PM3:52,Tianwang Li ,写道: > 我们对用户在使用习惯了Hive之后,在写一些flink sql对时候经常碰到大小写对困扰。 > 使用对是默认对catalog。 > > ``` > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column > 'uid' not found in any table; did you mean 'Uid'? > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 28 more > > ``` > > > > -- > ** > tivanli > **
[需求调研] Stream SQL window join 支持
大家好 ~ 这里做一个 window-join[1] 的需求调研, window-join 是 Flink DataStream 上已经有的 feature. 目标是决策是否要在 SQL 上支持该特性, 例如, tumbling window join 语法可能长这样: ```sql select ... window_start, window_end from TABLE( TUMBLE( DATA => TABLE table_a, TIMECOL => DESCRIPTOR(rowtime), SIZE => INTERVAL '1' MINUTE)) tumble_a [LEFT | RIGHT | FULL OUTER] JOIN TABLE( TUMBLE( DATA => TABLE table_b, TIMECOL => DESCRIPTOR(rowtime), SIZE => INTERVAL '1' MINUTE)) tumble_b on tumble_a.col1 = tumble_b.col1 and ... ``` 目前了解到的情况是一些公司 interval join 用的比较多,window join 的 case 还是比较少的, 希望这里可以有更多的反馈。 希望您可以分享 window join 的一些使用案例,以及为什么选用 window-join (而不是 interval join)。 感谢 ~ [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html Best, Danny Chan
Re: DDL中声明主键会报类型不匹配
是的 加了 primary key constraint 后会强制将类型转成 Not nullable,这个是 primary key 的特性导致的。 Best, Danny Chan 在 2020年8月20日 +0800 PM5:19,xiao cai ,写道: > Hi: > flink版本1.11.0 connector为kafka > DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。 > 把shop_id设置为 varchar not null也不行。 > > > org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table > field 'shop_id' does not match with the physical type STRING of the 'shop_id' > field of the TableSource return type. > > > SQL如下: > create table source_0 ( > `shop_id` varchar, > `user_id` bigint, > `category_id` int, > `ts` bigint, > `proc_time` as PROCTIME(), > `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, '-MM-dd > HH:mm:ss')), > watermark for event_time AS event_time, > PRIMARY KEY (shop_id, user_id) NOT ENFORCED > ) with ( > 'connector.type' = 'kafka', > > > )
Re: Flink SQL Map类型字段大小写不敏感支持
您好 现在 Flink SQL 是大小写敏感的 目前还没有计划开启大小写不敏感。 Best, Danny Chan 在 2020年8月21日 +0800 AM11:04,zilong xiao ,写道: > 如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key > aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值
Re: Table api son schema
Flink 1.11 支持 RAW 作为 sql 类型,基于此,你可以自己扩展 SE/DE 的逻辑实现部分动态解析[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw Best, Danny Chan 在 2020年8月11日 +0800 PM3:40,Zhao,Yi(SEC) ,写道: > <> > > > 如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。 > > Exception in thread "main" org.apache.flink.table.api.TableException: A raw > type backed by type information has no serializable string representation. It > needs to be resolved into a proper raw type. > at > org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97) > at org.apache.flink.table.descriptors.Schema.field(Schema.java:88) > at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83) > > 不清楚有啥解决方法吗? > > > 其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。 > 我业务中json实际如下: > { > “d”:{ > “key”: value > …. … . .. ..// 此处key动态扩展 > } > } > 我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。
Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。
你好 ~ 1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问 catalog 中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表 2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的 catalog Best, Danny Chan 在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道: > 1 为什么flinksql 1.11中,JDBC > Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc > connectior和jdbc catalog分开放入各自目录。 > > 2 为什么flinksql1.11中,connector部分没有hive connector。而是在hive > integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive Integration/Hive Read & > Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive, Flink can > read and write from Hive data as an alternative to Hive’s batch > engine.”。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive > catalog。不可以使用jdbc catalog,但使用hive connector嘛?